Category: Services

  • SEO for Web Apps: How to Boost Your Search Rankings

    The responsibilities of a web developer are not just designing and developing a web application but adding the right set of features that allow the site get higher traffic. One way of getting traffic is by ensuring your web page is listed in top search results of Google. Search engines consider certain factors while ranking the web page (which are covered in this guide below), and accommodating these factors in your web app is called search engine optimization. 

    A web app that is search engine optimized loads faster, has a good user experience, and is shown in the top search results of Google. If you want your web app to have these features, then this essential guide to SEO will provide you with a checklist to follow when working on SEO improvements.

    Key Facts:

    • 75% of visitors only visit the first three links listed and results from the second page get only 0.78% of clicks.
    • 95% of visitors visit only the links from the first page of Google.
    • Search engines give 300% more traffic than social media.
    • 8% of searches from browsers are in the form of a question.
    • 40% of visitors will leave a website if it takes more than 3 seconds to load. And more shocking is that 80% of those visitors will not visit the same site again.

    How Search Works:

     

     

    1. Crawling: These are the automated scripts that are often referred to as web crawlers, web spiders, Googlebot, and sometimes shortened to crawlers. These scripts look for the past crawls and look for the sitemap file, which is found at the root directory of the web application. We will cover more on the sitemap later. For now, just understand that the sitemap file has all the links to your website, which are ordered hierarchically. Crawlers add those links to the crawl queue so that they can be crawled later. Crawlers pay special attention to newly added sites and frequently updated/visited sites, and they use several algorithms to find how often the existing site should be recrawled.
    2. Indexing: Let us first understand what indexing means. Indexing is collecting, parsing, and storing data to enable a super-fast response to queries. Now, Google uses the same steps to perform web indexing. Google visits each page from the crawl queue and analyzes what the page is about and analyzes the content, images, and video, then parses the analyzed result and stores it into their database called Google Index.
    3. Serving: When a user makes a search query on Google, Google tries to determine the highest quality result and considers other criteria before serving the result, like user’s location, user’s submitted data, language, and device (desktop/mobile). That is why responsiveness is also considered for SEO. Unresponsive sites might have a higher ranking for desktop but will have a lower ranking for mobile because, while analyzing the page content, these bots see the pages as what the user sees and assign the ranking accordingly.

    Factors that affect SEO ranking:

    1. Sitemap: The sitemap file has two types: HTML & XML, and both files are placed at the root of the web app. The HTML sitemap guides users around the website pages, and it has the pages listed hierarchically  to help users understand the flow of the website. The XML sitemap helps the search engine bots crawl the pages of the site, and it helps the crawlers to understand the website structure. It has different types of data, which helps the bots to perform crawling cleverly.

    loc: The URL of the webpage.

    lastmod: When the content of the URL got updated.

    changefreq: How often the content of the page gets changed.

    priority: It has the range from 0 to 1—0 represents the lowest priority, and 1 represents the highest. 1 is generally given to the home or landing page. Setting 1 to every URL will cause search engines to ignore this field.

    Click here to see how a sitemap.xml looks like.

    The below example shows how the URL will be written along with the fields.

     

    2. Meta tags: Meta tags are very important because they indirectly affect the SEO ranking,  and they contain important information about the web page, and this information is shown as the snippet in Google search results. Users see this snippet and decide whether to click this link, and search engines consider the click rates parameter when serving the results. Meta tags are not visible to the user on the web page, but they are part of HTML code.

    A few important meta tags for SEO are:

    • Meta title: This is the primary content shown by the search results, and it plays a huge role in deciding the click rates because it gives users a quick glance at what this page is about. It should ideally be 50-60 characters long, and the title should be unique for each page.
    • Meta description: It summarizes or gives an overview of the page content in short. The description should be precise and of high quality. It should include some targeted keywords the user will likely search and be under 160 characters.
    • Meta robots: It tells search engines whether to index and crawl web pages. The four values it can contain are index, noindex, follow, or nofollow. If these values are not used correctly, then it will negatively impact the SEO.
      index/noindex: Tells whether to index the web page.
      follow/nofollow: Tells whether to crawl links on the web page.
    • Meta viewport: It sends the signal to search engines that the web page is responsive to different screen sizes, and it instructs the browser on how to render the page. This tag presence helps search engines understand that the website is mobile-friendly, which matters because Google ranks the results differently in mobile search. If the desktop version is opened in mobile, then the user will most likely close the page, sending a negative signal to Google that this page has some undesirable content and results in lowering the ranking. This tag should be present on all the web pages.

      Let us look at what a Velotio page would look like with and without the meta viewport tag.


    • Meta charset: It sets the character encoding of the webpage in simple terms, telling how the text should be displayed on the page. Wrong character encoding will make content hard to read for search engines and will lead to a bad user experience. Use UTF-8 character encoding wherever possible.
    • Meta keywords: Search engines don’t consider this tag anymore. Bing considers this tag as spam. If this tag is added to any of the web pages, it may work against SEO. It is advisable not to have this tag on your pages.

    3. Usage of Headers / Hierarchical content: Header tags are the heading tags that are important for user readability and search engines. Headers organize the content of the web page so that it won’t look like a plain wall of text. Bots check for how well the content is organized and assign the ranking accordingly. Headers make the content user-friendly, scannable, and accessible. Header tags are from h1 to h6, with h1 being high importance and h6 being low importance. Googlebot considers h1 mainly because it is typically the title of the page and provides brief information about what this page content has.

    If Velotio’s different pages of content were written on one big page (not good advice, just for example), then hierarchy can be done like the below snapshot.

    4. Usage of Breadcrumb: Breadcrumbs are the navigational elements that allow users to track which page they are currently on. Search engines find this helpful to understand the structure of the website. It lowers the bounce rate by engaging users to explore other pages of the website. Breadcrumbs can be found at the top of the page with slightly smaller fonts. Usage of breadcrumb is always recommended if your site has deeply nested pages.

    If we refer to the MDN pages, then a hierarchical breadcrumb can be found at the top of the page.

    5. User Experience (UX): UX has become an integral component of SEO. A good UX always makes your users stay longer, which lowers the bounce rate and makes them visit your site again. Google recognizes this stay time and click rates and considers the site as more attractive to users, ranking it higher in the search results. Consider the following points to have a good user experience.

    1. Divide content into sections, not just a plain wall of text
    2. Use hierarchical font sizes
    3. Use images/videos that summarize the content
    4. Good theme and color contrast
    5. Responsiveness (desktop/tablet/mobile)

    6. Robots.txt: The robots.txt file prevents crawlers from accessing all pages of the site. It contains some commands that tell the bots not to index the disallowed pages. By doing this, crawlers will not crawl those pages and will not index them. The best example of a page that should not be crawled is the payment gateway page. Robots.txt is kept at the root of the web app and should be public. Refer to Velotio’s robots.txt file to know more about it. User-Agent:* means the given command will be applied to all the bots that support robots.txt.

    7. Page speed: Page speed is the time it takes to get the page fully displayed and interactive. Google also considers page speed an important factor for SEO. As we have seen from the facts section, users tend to close a site if it takes longer than 3 seconds to load. To Googlebot, this is something unfavorable to the user experience, and it will lower the ranking. We will go through some tools later in this section to  know the loading speed of a page, but if your site loads slowly, then look into the recommendations below.

    • Image compression: In a consumer-oriented website, the images contribute to around 50-90% of the page. The images must load quickly. Use compressed images, which lowers the file size without compromising the quality. Cloudinary is a platform that does this job decently.
      If your image size is 700×700 and is shown in a 300x*300 container, then rather than doing this with CSS, load the image at 300x*300 only, because browsers don’t need to load such a big image, and it will take more time to reduce the image through CSS. All this time can be avoided by loading an image of the required size.
      By utilizing deferring/lazy image loading, images are downloaded when they are needed as the user scrolls on the webpage. Doing this allows the images to not be loaded at once, and browsers will have the bandwidth to perform other tasks.
      Using sprite images is also an effective way to reduce the HTTP requests by combining small icons into one sprite image and displaying the section we want to show. This will save load time by avoiding loading multiple images.
    • Code optimization: Every developer should consider reusability while developing code, which will help in reducing the code size. Nowadays, most websites are developed using bundlers. Use bundle analyzers to analyze which piece of code is leading to a size increase. Bundlers are already doing the minification process while generating the build artifacts.
    • Removing render-blocking resources: Browsers build the DOM tree by parsing HTML. During this process, if it finds any scripts, then the creation of the DOM tree is paused and script execution starts. This will increase the page load time, and to make it work without blocking DOM creation, use async & defer in your scripts and load the script at the footer of the body. Keep in mind, though, that some scripts need to be loaded on the header like Google analytics script. Don’t use this suggested step blindly as it may cause some unusual behavior in your site.
    • Implementing a Content Distribution Network (CDN): It helps in loading the resources in a shorter time by figuring out the nearest server located from the user location and delivering the content from the nearest server.
    • Good hosting platform: Optimizing images and code alone can not always improve page speed. Budget-friendly servers serve millions of other websites, which will prevent your site from loading quickly. So, it is always recommended to use the premium hosting service or a dedicated server.
    • Implement caching: If resources are cached on a browser, then they are not fetched from the server; rather the browser picks them from the cache. It is important to have an expiration time while setting cache. And caching should also be done only on the resources that are not updated frequently.
    • Reducing redirects: In redirecting a page, an additional time is added for the HTTP request-response cycle. It is advisable not to use too many redirects.

    Some tools help us find the score of our website and provide information on what areas can be improved. These tools consider SEO, user experience, and accessibility point of view while calculating the score. These tools give results in some technical terms. Let us understand them in short:

    1. Time to first byte: It represents the moment when the web page starts loading. When we see a white screen for some time on page landing, that is TTFB at work.

    2. First contentful paint: It represents when the user sees something on the web page.

    3. First meaningful paint: It tells when the user understands the content, like text/images on the web page.

    4. First CPU idle: It represents the moment when the site has loaded enough information for it to be able to handle the user’s first input.

    5. Largest contentful paint: It represents when everything above the page’s fold (without scrolling) is visible.

    6. Time to interactive: It represents the moment when the web page is fully interactive.

    7. Total blocking time: It is the total amount of time the webpage was blocked.

    8. Cumulative layout shift: It is measured as the time taken in shifting web elements while the page is being rendered.

    Below are some popular tools we can use for performance analysis:

    1. Page speed insights: This assessment tool provides the score and opportunities to improve.

    2. Web page test: This monitoring tool lets you analyze each resource’s loading time.

    3. Gtmetrix: This is also an assessment tool like Lighthouse that gives some more information, and we can set test location as well.

    Conclusion:

    We have seen what SEO is, how it works, and how we can improve it by going through sitemap, meta tags, heading tags, robots.txt, breadcrumb, user experience, and finally the page load speed. For a business-to-consumer application, SEO is highly important. It lets you drive more traffic to your website. Hopefully, this basic guide will help you improve SEO for your existing and future websites.

    Related Articles

    1. Eliminate Render-blocking Resources using React and Webpack

    2. Building High-performance Apps: A Checklist To Get It Right

    3. Building a Progressive Web Application in React [With Live Code Examples]

  • Elasticsearch 101: Fundamentals & Core Components

    Elasticsearch is currently the most popular way to implement free text search and analytics in applications. It is highly scalable and can easily manage petabytes of data. It supports a variety of use cases like allowing users to easily search through any portal, collect and analyze log data, build business intelligence dashboards to quickly analyze and visualize data.  

    This blog acts as an introduction to Elasticsearch and covers the basic concepts of clusters, nodes, index, document and shards.

    What is Elasticsearch?

    Elasticsearch (ES) is a combination of open-source, distributed, highly scalable data store, and Lucene – a search engine that supports extremely fast full-text search. It is a beautifully crafted software, which hides the internal complexities and provides full-text search capabilities with simple REST APIs. Elasticsearch is written in Java with Apache Lucene at its core. It should be clear that Elasticsearch is not like a traditional RDBMS. It is not suitable for your transactional database needs, and hence, in my opinion, it should not be your primary data store. It is a common practice to use a relational database as the primary data store and inject only required data into Elasticsearch.

    Elasticsearch is meant for fast text search. There are several functionalities, which make it different from RDBMS. Unlike RDBMS, Elasticsearch stores data in the form of a JSON document, which is denormalized and doesn’t support transactions, referential integrity, joins, and subqueries.

    Elasticsearch works with structured, semi-structured, and unstructured data as well. In the next section, let’s walk through the various components in Elasticsearch.

    Elasticsearch Components

    Cluster

    One or more servers collectively providing indexing and search capabilities form an Elasticsearch cluster. The cluster size can vary from a single node to thousands of nodes, depending on the use cases.

    Node

    Node is a single physical or virtual machine that holds full or part of your data and provides computing power for indexing and searching your data. Every node is identified with a unique name. If the node identifier is not specified, a random UUID is assigned as a node identifier at the startup. Every node configuration has the property `cluster.name`. The cluster will be formed automatically with all the nodes having the same `cluster.name` at startup.

    A node has to accomplish several duties such as:

    • storing the data
    • performing operations on data (indexing, searching, aggregation, etc.)
    • maintaining the health of the cluster

    Each node in a cluster can do all these operations. Elasticsearch provides the capability to split responsibilities across different nodes. This makes it easy to scale, optimize, and maintain the cluster. Based on the responsibilities, the following are the different types of nodes that are supported:

    Data Node

    Data node is the node that has storage and computation capability. Data node stores the part of data in the form of shards (explained in the later section). Data nodes also participate in the CRUD, search, and aggregate operations. These operations are resource-intensive, and hence, it is a good practice to have dedicated data nodes without having the additional load of cluster administration. By default, every node of the cluster is a data node.

    Master Node

    Master nodes are reserved to perform administrative tasks. Master nodes track the availability/failure of the data nodes. The master nodes are responsible for creating and deleting the indices (explained in the later section).

    This makes the master node a critical part of the Elasticsearch cluster. It has to be stable and healthy. A single master node for a cluster is certainly a single point of failure. Elasticsearch provides the capability to have multiple master-eligible nodes. All the master eligible nodes participate in an election to elect a master node. It is recommended to have a minimum of three nodes in the cluster to avoid a split-brain situation. By default, all the nodes are both data nodes as well as master nodes. However, some nodes can be master-eligible nodes only through explicit configuration.

    Coordinating-Only Node

    Any node, which is not a master node or a data node, is a coordinating node. Coordinating nodes act as smart load balancers. Coordinating nodes are exposed to end-user requests. It appropriately redirects the requests between data nodes and master nodes.

    To take an example, a user’s search request is sent to different data nodes. Each data node searches locally and sends the result back to the coordinating node. Coordinating node aggregates and returns the result to the user.

    There are a few concepts that are core to Elasticsearch. Understanding these basic concepts will tremendously ease the learning process.

    Index

    Index is a container to store data similar to a database in the relational databases. An index contains a collection of documents that have similar characteristics or are logically related. If we take an example of an e-commerce website, there will be one index for products, one for customers, and so on. Indices are identified by the lowercase name. The index name is required to perform the add, update, and delete operations on the documents.

    Type

    Type is a logical grouping of the documents within the index. In the previous example of product index, we can further group documents into types, like electronics, fashion, furniture, etc. Types are defined on the basis of documents having similar properties in it. It isn’t easy to decide when to use the type over the index. Indices have more overheads, so sometimes, it is better to use different types in the same index for better performance. There are a couple of restrictions to use types as well. For example, two fields having the same name in different types of documents should be of the same datatype (string, date, etc.).

    Document

    Document is the piece indexed by Elasticsearch. A document is represented in the JSON format. We can add as many documents as we want into an index. The following snippet shows how to create a document of type mobile in the index store. We will cover more about the individual field of the document in the Mapping Type section.

    HTTP POST <hostname:port>/store/mobile/
    {    
    "name": "Motorola G5",    
    "model": "XT3300",    
    "release_date": "2016-01-01",    
    "features": "16 GB ROM | Expandable Upto 128 GB | 5.2 inch Full HD Display | 12MP Rear Camera | 5MP Front Camera | 3000 mAh Battery | Snapdragon 625 Processor",    
    "ram_gb": "3",    
    "screen_size_inches": "5.2"
    }

    Mapping Types

    To create different types in an index, we need mapping types (or simply mapping) to be specified during index creation. Mappings can be defined as a list of directives given to Elasticseach about how the data is supposed to be stored and retrieved. It is important to provide mapping information at the time of index creation based on how we want to retrieve our data later. In the context of relational databases, think of mappings as a table schema.

    Mapping provides information on how to treat each JSON field. For example, the field can be of type date, geolocation, or person name. Mappings also allow specifying which fields will participate in the full-text search, and specify the analyzers used to transform and decorate data before storing into an index. If no mapping is provided, Elasticsearch tries to identify the schema itself, known as Dynamic Mapping. 

    Each mapping type has Meta Fields and Properties. The snippet below shows the mapping of the type mobile.

    {    
    "mappings": {        
      "mobile": {            
        "properties": {                
          "name": {                    
            "type": "keyword"                
          },                
            "model": {                    
              "type": "keyword"                
           },               
              "release_date": {                    
                "type": "date"                
           },                
                "features": {                    
                  "type": "text"               
             },                
                "ram_gb": {                    
                  "type": "short"                
              },                
                  "screen_size_inches": {                    
                    "type": "float"                
              }            
            }        
          }    
       }
    }

    Meta Fields

    As the name indicates, meta fields stores additional information about the document. Meta fields are meant for mostly internal usage, and it is unlikely that the end-user has to deal with meta fields. Meta field names starts with an underscore. There are around ten meta fields in total. We will talk about some of them here:

    _index

    It stores the name of the index document it belongs to. This is used internally to store/search the document within an index.

    _type

    It stores the type of the document. To get better performance, it is often included in search queries.

    _id

    This is the unique id of the document. It is used to access specific document directly over the HTTP GET API.

    _source

    This holds the original JSON document before applying any analyzers/transformations. It is important to note that Elasticsearch can query on fields that are indexed (provided mapping for). The _source field is not indexed, and hence, can’t be queried on but it can be included in the final search result.

    Fields Or Properties

    List of fields specifies which all JSON fields in the document should be included in a particular type. In the e-commerce website example, mobile can be a type. It will have fields, like operating_system, camera_specification, ram_size, etc.

    Fields also carry the data type information with them. This directs Elasticsearch to treat the specific fields in a particular way of storing/searching data. Data types are similar to what we see in any other programming language. We will talk about a few of them here.

    Simple Data Types

    Text

    This data type is used to store full-text like product description. These fields participate in full-text search. These types of fields are analyzed while storing, which enables to searching them by the individual word in it. Such fields are not used in sorting and aggregation queries.

    Keywords

    This type is also used to store text data, but unlike Text, it is not analyzed and stored. This is suitable to store information like a user’s mobile number, city, age, etc. These fields are used in filter, aggregation, and sorting queries. For e.g., list all users from a particular city and filter them by age.

    Numeric

    Elasticsearch supports a wide range of numeric type: long, integer, short, byte, double, float.

    There are a few more data types to support date, boolean (true/false, on/off, 1/0), IP (to store IP addresses).

    Special Data Types

    Geo Point

    This data type is used to store geographical location. It accepts latitude and longitude pair. For example, this data type can be used to arrange the user’s photo library by their geographical location or graphically display the locations trending on social media news.

    Geo Shape

    It allows storing arbitrary geometric shapes like rectangle, polygon, etc.

    Completion Suggester

    This data type is used to provide auto-completion feature over a specific field. As the user types certain text, the completion suggester can guide the user to reach particular results.

    Complex Data Type

    Object

    If you know JSON well, this concept won’t be new for you. Elasticsearch also allows storing nested JSON object structure as a document.

    Nested

    The Object data type is not that useful due to its underlying data representation in the Lucene index. Lucene index does not support inner JSON object. ES flattens the original JSON to make it compatible with storing in Lucene index. Thus, fields of the multiple inner objects get merged into one leading object to wrong search results. Most of the time, you may use Nested data type over Object.

    Shards

    Shards help with enabling Elasticsearch to become horizontally scalable. An index can store millions of documents and occupy terabytes of data. This can cause problems with performance, scalability, and maintenance. Let’s see how Shards help achieve scalability.

    Indices are divided into multiple units called Shards (refer the diagram below). Shard is a full-featured subset of an index. Shards of the same index now can reside on the same or different nodes of the cluster. Shard decides the degree of parallelism for search and indexing operations. Shards allow the cluster to grow horizontally. The number of shards per index can be specified at the time of index creation. By default, the number of shards created is 5. Although, once the index is created the number of shards can not be changed. To change the number of shards, reindex the data.

    Replication

    Hardware can fail at any time. To ensure fault tolerance and high availability, ES provides a feature to replicate the data. Shards can be replicated. A shard which is being copied is called as Primary Shard. The copy of the primary shard is called a replica shard or simply replica. Like the number of shards, the number of replication can also be specified at the time of index creation. Replication served two purposes:

    • High Availability – Replica is never been created on the same node where the primary shard is present. This ensures that data can be available through the replica shard even if the complete node is failed.
    • Performance – Replica can also contribute to search capabilities. The search queries will be executed parallelly across the replicas.

    To summarize, to achieve high availability and performance, the index is split into multiple shards. In a production environment, multiple replicas are created for every index. In the replicated index, only primary shards can serve write requests. However, all the shards (the primary shard as well as replicated shards) can serve read/query requests. The replication factor is defined at the time of index creation and can be changed later if required. Choosing the number of shards is an important exercise. As once defined, it can’t be changed. In critical scenarios, changing the number of shards requires creating a new index with required shards and reindexing old data.

    Summary

    In this blog, we have covered the basic but important aspects of Elasticsearch. In the following posts, I will talk about how indexing & searching works in detail. Stay tuned!

  • Improving Elasticsearch Indexing in the Rails Model using Searchkick

    Searching has become a prominent feature of any web application, and a relevant search feature requires a robust search engine. The search engine should be capable of performing a full-text search, auto completion, providing suggestions, spelling corrections, fuzzy search, and analytics. 

    Elasticsearch, a distributed, fast, and scalable search and analytic engine, takes care of all these basic search requirements.

    The focus of this post is using a few approaches with Elasticsearch in our Rails application to reduce time latency for web requests. Let’s review one of the best ways to improve the Elasticsearch indexing in Rails models by moving them to background jobs.

    In a Rails application, Elasticsearch can be integrated with any of the following popular gems:

    We can continue with any of these gems mentioned above. But for this post, we will be moving forward with the Searchkick gem, which is a much more Rails-friendly gem.

    The default Searchkick gem option uses the object callbacks to sync the data in the respective Elasticsearch index. Being in the callbacks, it costs the request, which has the creation and updation of a resource to take additional time to process the web request.

    The below image shows logs from a Rails application, captured for an update request of a user record. We have added a print statement before Elasticsearch tries to sync in the Rails model so that it helps identify from the logs where the indexing has started. These logs show that the last two queries were executed for indexing the data in the Elasticsearch index.

    Since the Elasticsearch sync is happening while updating a user record, we can conclude that the user update request will take additional time to cover up the Elasticsearch sync.

    Below is the request flow diagram:

    From the request flow diagram, we can say that the end-user must wait for step 3 and 4 to be completed. Step 3 is to fetch the children object details from the database.

    To tackle the problem, we can move the Elasticsearch indexing to the background jobs. Usually, for Rails apps in production, there are separate app servers, database servers, background job processing servers, and Elasticsearch servers (in this scenario).

    This is how the request flow looks when we move Elasticsearch indexing:

    Let’s get to coding!

    For demo purposes, we will have a Rails app with models: `User` and `Blogpost`. The stack used here:

    • Rails 5.2
    • Elasticsearch 6.6.7
    • MySQL 5.6
    • Searchkick (gem for writing Elasticsearch queries in Ruby)
    • Sidekiq (gem for background processing)

    This approach does not require  any specific version of Rails, Elasticsearch or Mysql. Moreover, this approach is database agnostic. You can go through the code from this Github repo for reference.

    Let’s take a look at the user model with Elasticsearch index.

    # == Schema Information
    #
    # Table name: users
    #
    #  id            :bigint           not null, primary key
    #  name          :string(255)
    #  email         :string(255)
    #  mobile_number :string(255)
    #  created_at    :datetime         not null
    #  updated_at    :datetime         not null
    #
    class User < ApplicationRecord
     searchkick
    
     has_many :blogposts
     def search_data
       {
         name: name,
         email: email,
         total_blogposts: blogposts.count,
         last_published_blogpost_date: last_published_blogpost_date
       }
     end
     ...
    end

    Anytime a user object is inserted, updated, or deleted, Searchkick reindexes the data in the Elasticsearch user index synchronously.

    Searchkick already provides four ways to sync Elasticsearch index:

    • Inline (default)
    • Asynchronous
    • Queuing
    • Manual

    For more detailed information on this, refer to this page. In this post, we are looking in the manual approach to reindex the model data.

    To manually reindex, the user model will look like:

    class User < ApplicationRecord
     searchkick callbacks: false
    
     def search_data
       ...
     end
    end

    Now, we will need to define a callback that can sync the data to the Elasticsearch index. Typically, this callback must be written in all the models that have the Elasticsearch index. Instead, we can write a common concern and include it to required models.

    Here is what our concern will look like:

    module ElasticsearchIndexer
     extend ActiveSupport::Concern
    
     included do
       after_commit :reindex_model
       def reindex_model
         ElasticsearchWorker.perform_async(self.id, self.class.name)
       end
     end
    end

    In the above active support concern, we have called the Sidekiq worker named ElasticsearchWorker. After adding this concern, don’t forget to include the Elasticsearch indexer concern in the user model, like so:

    include ElasticsearchIndexer

    Now, let’s see the Elasticsearch Sidekiq worker:

    class ElasticsearchWorker
     include Sidekiq::Worker
     def perform(id, klass)
       begin
         klass.constantize.find(id.to_s).reindex
       rescue => e
         # Handle exception
       end
     end
    end

    That’s it, we’ve done it. Cool, huh? Now, whenever a user creates, updates, or deletes web request, a background job will be created. The background job can be seen in the Sidekiq web UI at localhost:3000/sidekiq

    Now, there is little problem in the Elasticsearch indexer concern. To reproduce this, go to your user edit page, click save, and look at localhost:3000/sidekiq—a job will be queued.

    We can handle this case by tracking the dirty attributes. 

    module ElasticsearchIndexer
     extend ActiveSupport::Concern
     included do
       after_commit :reindex_model
       def reindex_model
         return if self.previous_changes.keys.blank?
         ElasticsearchWorker.perform_async(self.id, klass)
       end
     end
    end

    Furthermore, there are few more areas of improvement. Suppose you are trying to update the field of user model that is not part of the Elasticsearch index, the Elasticsearch worker Sidekiq job will still get created and reindex the associated model object. This can be modified to create the Elasticsearch indexing worker Sidekiq job only if the Elasticsearch index fields are updated.

    module ElasticsearchIndexer
     extend ActiveSupport::Concern
     included do
       after_commit :reindex_model
       def reindex_model
         updated_fields = self.previous_changes.keys
        
         # For getting ES Index fields you can also maintain constant
         # on model level or get from the search_data method.
         es_index_fields = self.search_data.stringify_keys.keys
         return if (updated_fields & es_index_fields).blank?
         ElasticsearchWorker.perform_async(self.id, klass)
       end
     end
    end

    Conclusion

    Moving the Elasticsearch indexing to background jobs is a great way to boost the performance of the web app by reducing the response time of any web request. Implementing this approach for every model would not be ideal. I would recommend this approach only if the Elasticsearch index data are not needed in real-time.

    Since the execution of background jobs depends on the number of jobs it must perform, it might take time to reflect the changes in the Elasticsearch index if there are lots of jobs queued up. To solve this problem to some extent, the Elasticsearch indexing jobs can be added in a queue with high priority. Also, make sure you have a different app server and background job processing server. This approach works best if the app server is different than the background job processing server.

  • Managing a TLS Certificate for Kubernetes Admission Webhook

    A Kubernetes admission controller is a great way of handling an incoming request, whether to add or modify fields or deny the request as per the rules/configuration defined. To extend the native functionalities, these admission webhook controllers call a custom-configured HTTP callback (webhook server) for additional checks. But the API server only communicates over HTTPS with the admission webhook servers and needs TLS cert’s CA information. This poses a problem for how we handle this webhook server certificate and how to pass CA information to the API server automatically.

    One way to handle these TLS certificate and CA is using Kubernetes cert-manager. However, Kubernetes cert-manager itself is a big application and consists of many CRDs to handle its operation. It is not a good idea to install cert-manager just to handle admission webhook TLS certificate and CA. The second and possibly easier way is to use self-signed certificate and handle CA on our own using the Init Container. This eliminates the dependency on other applications, like cert-manager, and gives us the flexibility to control our application flow.

    How is a custom admission webhook written? We will not cover this in-depth, and only a basic overview of admission controllers and their working will be covered. The main focus for this blog will be to cover the second approach step-by-step: handling admission webhook server TLS certificate and CA on our own using init container so that the API server can communicate with our custom webhook.

    To understand the in-depth working of Admission Controllers, these articles are great: 

    Prerequisites:

    • Knowledge of Kubernetes admission controllers,  MutatingAdmissionWebhook, ValidatingAdmissionWebhook
    • Knowledge of Kubernetes resources like pods and volumes

    Basic Overview: 

    Admission controllers intercept requests to the Kubernetes API server before persistence of objects in the etcd. These controllers are bundled and compiled together into the kube-apiserver binary. They consist of a list of controllers, and in that list, there are two special controllers: MutatingAdmissionWebhook and ValidatingAdmissionWebhook. MutatingAdmissionWebhook, as the name suggests, mutates/adds/modifies some fields in the request object by creating a patch, and ValidatingAdmissionWebhook validates the request by checking if the request object fields are valid or if the operation is allowed, etc., as per custom logic.

    The main reason for these types of controllers is to dynamically add new checks along with the native existing checks in Kubernetes to allow a request, just like the plug-in model. To understand this more clearly, let’s say we want all the deployments in the cluster to have certain required labels. If the deployment does not have required labels, then the create deployment request should be denied. This functionality can be achieved in two ways: 

    1) Add these extra checks natively in Kubernetes API server codebase, compile a new binary, and run with the new binary. This is a tedious process, and every time new checks are needed, a new binary is required. 

    2) Create a custom admission webhook, a simple HTTP server, for these additional checks, and register this admission webhook with the API server using AdmissionRegistration API. To register two configurations, MutatingWebhookconfiguration and ValidatingWebhookConfiguration are used. The second approach is recommended and it’s quite easy as well. We will be discussing it here in detail.

    Custom Admission Webhook Server:

    As mentioned earlier, a custom admission webhook server is a simple HTTP server with TLS that exposes endpoints for mutation and validation. Depending upon the endpoint hit, corresponding handlers process mutation and validation. Once a custom webhook server is ready and deployed in a cluster as a deployment along with webhook service, the next part is to register it with the API server so that the API server can communicate with the custom webhook server. To register, MutatingWebhookconfiguration and ValidatingWebhookConfiguration are used. These configurations have a section to fill custom webhook related information.

    apiVersion: admissionregistration.k8s.io/v1
    kind: MutatingWebhookConfiguration
    metadata:
      name: mutation-config
    webhooks:
      - admissionReviewVersions:
        - v1beta1
        name: mapplication.kb.io
        clientConfig:
          caBundle: ${CA_BUNDLE}
          service:
            name: webhook-service
            namespace: default
            path: /mutate
        rules:
          - apiGroups:
              - apps
          - apiVersions:
              - v1
            resources:
              - deployments
        sideEffects: None

    Here, the service field gives information about the name, namespace, and endpoint path of the webhook server running. An important field here to note is the CA bundle. A custom admission webhook is required to run the HTTP server with TLS only because the API server only communicates over HTTPS. So the webhook server runs with server cert, and key and “caBundle” in the configuration is CA (Certification Authority) information so that API server can recognize server certificate.

    The problem here is how to handle this server certificate and the key—and how to get this CA bundle and pass this information to the API server using MutatingWebhookconfiguration or ValidatingWebhookConfiguration. This will be the main focus of the following part.

    Here, we are going to use a self-signed certificate for the webhook server. Now, this self-signed certificate can be made available to the webhook server using different ways. Two possible ways are:

    • Create a Kubernetes secret containing certificate and key and mount that as volume on to the server pod
    • Somehow create certificate and key in a volume, e.g., emptyDir volume and server consumes those from that volume

    However, even after doing any of the above two possible ways, the remaining important part is to add the CA bundle in mutation/validation configs.

    So, instead of doing all these steps manually, we all make use of Kubernetes init containers to perform all functions for us.

    Custom Admission Webhook Server Init Container:

    The main function of this init container will be to create a self-signed webhook server certificate and provide the CA bundle to the API server via mutation/validation configs. How the webhook server consumes this certificate (via secret volume or emptyDir volume), depends on the use-case. This init container will run a simple Go binary to perform all these functions.

    package main
    
    import (
    	"bytes"
    	cryptorand "crypto/rand"
    	"crypto/rsa"
    	"crypto/x509"
    	"crypto/x509/pkix"
    	"encoding/pem"
    	"fmt"
    	log "github.com/sirupsen/logrus"
    	"math/big"
    	"os"
    	"time"
    )
    
    func main() {
    	var caPEM, serverCertPEM, serverPrivKeyPEM *bytes.Buffer
    	// CA config
    	ca := &x509.Certificate{
    		SerialNumber: big.NewInt(2020),
    		Subject: pkix.Name{
    			Organization: []string{"velotio.com"},
    		},
    		NotBefore:             time.Now(),
    		NotAfter:              time.Now().AddDate(1, 0, 0),
    		IsCA:                  true,
    		ExtKeyUsage:           []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
    		KeyUsage:              x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
    		BasicConstraintsValid: true,
    	}
    
    	// CA private key
    	caPrivKey, err := rsa.GenerateKey(cryptorand.Reader, 4096)
    	if err != nil {
    		fmt.Println(err)
    	}
    
    	// Self signed CA certificate
    	caBytes, err := x509.CreateCertificate(cryptorand.Reader, ca, ca, &caPrivKey.PublicKey, caPrivKey)
    	if err != nil {
    		fmt.Println(err)
    	}
    
    	// PEM encode CA cert
    	caPEM = new(bytes.Buffer)
    	_ = pem.Encode(caPEM, &pem.Block{
    		Type:  "CERTIFICATE",
    		Bytes: caBytes,
    	})
    
    	dnsNames := []string{"webhook-service",
    		"webhook-service.default", "webhook-service.default.svc"}
    	commonName := "webhook-service.default.svc"
    
    	// server cert config
    	cert := &x509.Certificate{
    		DNSNames:     dnsNames,
    		SerialNumber: big.NewInt(1658),
    		Subject: pkix.Name{
    			CommonName:   commonName,
    			Organization: []string{"velotio.com"},
    		},
    		NotBefore:    time.Now(),
    		NotAfter:     time.Now().AddDate(1, 0, 0),
    		SubjectKeyId: []byte{1, 2, 3, 4, 6},
    		ExtKeyUsage:  []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
    		KeyUsage:     x509.KeyUsageDigitalSignature,
    	}
    
    	// server private key
    	serverPrivKey, err := rsa.GenerateKey(cryptorand.Reader, 4096)
    	if err != nil {
    		fmt.Println(err)
    	}
    
    	// sign the server cert
    	serverCertBytes, err := x509.CreateCertificate(cryptorand.Reader, cert, ca, &serverPrivKey.PublicKey, caPrivKey)
    	if err != nil {
    		fmt.Println(err)
    	}
    
    	// PEM encode the  server cert and key
    	serverCertPEM = new(bytes.Buffer)
    	_ = pem.Encode(serverCertPEM, &pem.Block{
    		Type:  "CERTIFICATE",
    		Bytes: serverCertBytes,
    	})
    
    	serverPrivKeyPEM = new(bytes.Buffer)
    	_ = pem.Encode(serverPrivKeyPEM, &pem.Block{
    		Type:  "RSA PRIVATE KEY",
    		Bytes: x509.MarshalPKCS1PrivateKey(serverPrivKey),
    	})
    
    	err = os.MkdirAll("/etc/webhook/certs/", 0666)
    	if err != nil {
    		log.Panic(err)
    	}
    	err = WriteFile("/etc/webhook/certs/tls.crt", serverCertPEM)
    	if err != nil {
    		log.Panic(err)
    	}
    
    	err = WriteFile("/etc/webhook/certs/tls.key", serverPrivKeyPEM)
    	if err != nil {
    		log.Panic(err)
    	}
    
    }
    
    // WriteFile writes data in the file at the given path
    func WriteFile(filepath string, sCert *bytes.Buffer) error {
    	f, err := os.Create(filepath)
    	if err != nil {
    		return err
    	}
    	defer f.Close()
    
    	_, err = f.Write(sCert.Bytes())
    	if err != nil {
    		return err
    	}
    	return nil
    }

    The steps to generate self-signed CA and sign webhook server certificate using this CA in Golang:

    • Create a config for the CA, ca in the code above.
    • Create an RSA private key for this CA, caPrivKey in the code above.
    • Generate a self-signed CA, caBytes, and caPEM above. Here caPEM is the PEM encoded caBytes and will be the CA bundle given to the API server.
    • Create a config for webhook server certificate, cert in the code above. The important field in this configuration is the DNSNames and commonName. This name must be the full webhook service name of the webhook server to reach the webhook pod.
    • Create an RS private key for the webhook server, serverPrivKey in the code above.
    • Create server certificate using ca and caPrivKey, serverCertBytes in the code above.
    • Now, PEM encode the serverPrivKey and serverCertBytes. This serverPrivKeyPEM and serverCertPEM is the TLS certificate and key and will be consumed by the webhook server.

    At this point, we have generated the required certificate, key, and CA bundle using init container. Now we will share this server certificate and key with the actual webhook server container in the same pod. 

    • One approach is to create an empty secret resource before-hand, create webhook deployment by passing the secret name as an environment variable. Init container will generate server certificate and key and populate the empty secret with certificate and key information. This secret will be mounted on to webhook server container to start HTTP server with TLS.
    • The second approach (used in the code above) is to use Kubernete’s native pod specific emptyDir volume. This volume will be shared between both the containers. In the code above, we can see the init container is writing these certificate and key information in a file on a particular path. This path will be the one emptyDir volume is mounted to, and the webhook server container will read certificate and key for TLS configuration from that path and start the HTTP webhook server. Refer to the below diagram:

    The pod spec will look something like this:

    spec:
      initContainers:
          image: <webhook init-image name>
          imagePullPolicy: IfNotPresent
          name: webhook-init
          volumeMounts:
            - mountPath: /etc/webhook/certs
              name: webhook-certs
      containers:
          image: <webhook server image name>
          imagePullPolicy: IfNotPresent
          name: webhook-server
          volumeMounts:
            - mountPath: /etc/webhook/certs
              name: webhook-certs
              readOnly: true
      volumes:
        - name: webhook-certs
          emptyDir: {}

    The only part remaining is to give this CA bundle information to the API server using mutation/validation configs. This can be done in two ways:

    • Patch the CA bundle in the existing MutatingWebhookConfiguration or ValidatingWebhookConfiguration using Kubernetes go-client in the init container.
    • Create MutatingWebhookConfiguration or ValidatingWebhookConfiguration in the init container itself with CA bundle information in configs.

    Here, we will create configs through init container. To get certain parameters, like mutation config name, webhook service name, and webhook namespace dynamically, we can take these values from init containers env:

    initContainers:
      image: <webhook init-image name>
      imagePullPolicy: IfNotPresent
      name: webhook-init
      volumeMounts:
        - mountPath: /etc/webhook/certs
          name: webhook-certs
      env:
        - name: MUTATE_CONFIG
          value: mutating-webhook-configuration
        - name: VALIDATE_CONFIG
          value: validating-webhook-configuration
        - name: WEBHOOK_SERVICE
          value: webhook-service
        - name: WEBHOOK_NAMESPACE
          value:  default

    To create MutatingWebhookConfiguration, we will add the below piece of code in init container code below the certificate generation code.

    package main
    
    import (
    	"bytes"
    	admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
    	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    	"k8s.io/client-go/kubernetes"
    	"os"
    	ctrl "sigs.k8s.io/controller-runtime"
    )
    
    func createMutationConfig(caCert *bytes.Buffer) {
    
    	var (
    		webhookNamespace, _ = os.LookupEnv("WEBHOOK_NAMESPACE")
    		mutationCfgName, _  = os.LookupEnv("MUTATE_CONFIG")
    		// validationCfgName, _ = os.LookupEnv("VALIDATE_CONFIG") Not used here in below code
    		webhookService, _ = os.LookupEnv("WEBHOOK_SERVICE")
    	)
    	config := ctrl.GetConfigOrDie()
    	kubeClient, err := kubernetes.NewForConfig(config)
    	if err != nil {
    		panic("failed to set go -client")
    	}
    
    	path := "/mutate"
    	fail := admissionregistrationv1.Fail
    
    	mutateconfig := &admissionregistrationv1.MutatingWebhookConfiguration{
    		ObjectMeta: metav1.ObjectMeta{
    			Name: mutationCfgName,
    		},
    		Webhooks: []admissionregistrationv1.MutatingWebhook{{
    			Name: "mapplication.kb.io",
    			ClientConfig: admissionregistrationv1.WebhookClientConfig{
    				CABundle: caCert.Bytes(), // CA bundle created earlier
    				Service: &admissionregistrationv1.ServiceReference{
    					Name:      webhookService,
    					Namespace: webhookNamespace,
    					Path:      &path,
    				},
    			},
    			Rules: []admissionregistrationv1.RuleWithOperations{{Operations: []admissionregistrationv1.OperationType{
    				admissionregistrationv1.Create},
    				Rule: admissionregistrationv1.Rule{
    					APIGroups:   []string{"apps"},
    					APIVersions: []string{"v1"},
    					Resources:   []string{"deployments"},
    				},
    			}},
    			FailurePolicy: &fail,
    		}},
    	}
      
    	if _, err := kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(mutateconfig)
    		err != nil {
    		panic(err)
    	}
    }

    The code above is just a sample code to create MutatingWebhookConfiguration. Here first, we are importing the required packages. Then, we are reading the environment variables like webhookNamespace, etc. Next, we are defining the MutatingWebhookConfiguration struct with CA bundle information (created earlier) and other required information. Finally, we are creating a configuration using the go-client. The same approach can be followed for creating the ValidatingWebhookConfiguration. For cases of pod restart or deletion, we can add extra logic in init containers like delete the existing configs first before creating or updating only the CA bundle if configs already exist.  

    For certificate rotation, the approach will be different for each approach taken for serving this certificate to the server container:

    • If we are using emptyDir volume, then the approach will be to just restart the webhook pod. As emptyDir volume is ephemeral and bound to the lifecycle of the pod, on restart, a new certificate will be generated and served to the server container. A new CA bundle will be added in configs if configs already exist.
    • If we are using secret volume, then, while restarting the webhook pod, the expiration of the existing certificate from the secret can be checked to decide whether to use the existing certificate for the server or create a new one.

    In both cases, the webhook pod restart is required to trigger the certificate rotation/renew process. When you will want to restart the webhook pod and how the webhook pod will be restarted will vary depending on the use-case. A few possible ways can be using cron-job, controllers, etc.

    Now, our custom webhook is registered, the API server can read CA bundle information through configs, and the webhook server is ready to serve the mutation/validation requests as per rules defined in configs. 

    Conclusion:

    We covered how we will add additional checks mutation/validation by registering our own custom admission webhook server. We also covered how we can automatically handle webhook server TLS certificate and key using init containers and passing the CA bundle information to API server through mutation/validation configs.

    Related Articles:

    1. OPA On Kubernetes: An Introduction For Beginners

    2. Prow + Kubernetes – A Perfect Combination To Execute CI/CD At Scale

  • Real Time Analytics for IoT Data using Mosquitto, AWS Kinesis and InfluxDB

    Internet of things (IoT) is maturing rapidly and it is finding application across various industries. Every common device that we use is turning into the category of smart devices. Smart devices are basically IoT devices. These devices captures various parameters in and around their environment leading to generation of a huge amount of data. This data needs to be collected, processed, stored and analyzed in order to get actionable insights from them. To do so, we need to build data pipeline.  In this blog we will be building a similar pipeline using Mosquitto, Kinesis, InfluxDB and Grafana. We will discuss all these individual components of the pipeline and the steps to build it.

    Why the Analysis of IoT data is different

    In an IoT setup, the data is generated by sensors that are distributed across various locations. In order to use the data generated by them we should first get them to a common location from where the various applications which want to process them can read it.

    Network Protocol

    IoT devices have low computational and network resources. Moreover, these devices write data in very short intervals thus high throughput is expected on the network. For transferring IoT data it is desirable to use lightweight network protocols. A protocol like HTTP uses a complex structure for communication resulting in consumption of more resources making it unsuitable for IoT data transfer. One of the lightweight protocol suitable for IoT data is MQTT which we are using in our pipeline. MQTT is designed for machine to machine (M2M) connectivity. It uses a publisher/subscriber communication model and helps clients to distribute telemetry data with very low network resource consumption. Along with IoT MQTT has been found to be useful in other fields as well.

    Other similar protocols include Constrained Application Protocol (CoAP), Advanced Message Queuing Protocol (AMQP) etc.

    Datastore   

    IoT devices generally collect telemetry about its environment usually through sensors. In most of the IoT scenarios, we try to analyze how things have changed over a period of time. Storing these data in a time series database makes our analysis simpler and better. InfluxDB is popular time series database which we will use in our pipeline. More about time series databases can be read here.

    Pipeline Overview

    The first thing we need for a data pipeline is data. As shown in the image above the data generated by various sensors are written to a topic in the MQTT message broker. To mimic sensors we will use a program which uses the MQTT client to write data to the MQTT broker.

    The next component is Amazon Kinesis which is used for streaming data analysis. It closely resembles apache Kafka which is an open source tool used for similar purposes. Kinesis brings the data generated by a number of clients to a single location from where different consumers can pull it for processing. We are using Kinesis so that multiple consumers can read data from a single location. This approach scales well even if we have multiple message brokers.

    Once the data is written to the MQTT broker a Kinesis producer subscribes to it and pull the data from it and writes it to the Kinesis stream, from the Kinesis stream the data is pulled by Kinesis consumers which processes the data and writes it to an InfluxDB which is a time series database.

    Finally, we use Grafana which is a well-known tool for analytics and monitoring, we can connect it to many popular databases and perform analytics and monitoring. Another popular tool in this space is Kibana (the K of ELK stack)

    Setting up a MQTT Message Broker Server:

    For MQTT message broker we will use Mosquitto which is a popular open source message broker that implements MQTT. The details of downloading and installing mosquitto for various platforms are available here.

    For Ubuntu, it can be installed using the following commands

    sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
    sudo apt-get update
    sudo apt-get install mosquitto
    service mosquitto status

    Setting up InfluxDB and Grafana

    The simplest way to set up both these components is to use their docker image directly

    docker run --name influxdb -p 8083:8083 -p 8086:8086 influxdb:1.0
    docker run --name grafana -p 3000:3000 --link influxdb grafana/grafana:3.1.1

    In InfluxDB we have mapped two ports, port 8086 is the HTTP API endpoint port while 8083 is the administration web server’s port. We need to create a database where we will write our data.

    For creating a database we can directly go to the console at <influxdb-ip>:8083 and run the command: </influxdb-ip>

    CREATE DATABASE "iotdata"

    Or we can do it via HTTP request :

    curl -XPOST "http://localhost:8086/query" --data-urlencode "q=CREATE DATABASE iotdata

    Creating a Kinesis stream

    In Kinesis, we create streams where the Kinesis producers write the data coming from various sources and then the Kinesis consumers read the data from the stream. In the stream, the data is stored in various shards. For our purpose, one shard would be enough.

    Creating the MQTT client

    We will use the Golang client available in this repository to connect with our message broker server and write data to a specific topic. We will first create a new MQTT client. Here we can see the list of options we have for configuring our MQTT client.

    Once we create the options object we can pass it to the NewClient() method which will return us the MQTT client. Now we can write data to the MQTT server. We have defined the structure of the data in the struct sensor data. Now to mimic two sensors which are writing telemetry data to the MQTT broker we have two goroutines which push data to the MQTT server every five seconds.

    package publisher
    
    import (
    	"config"
    	"encoding/json"
    	"fmt"
    	"log"
    	"math/rand"
    	"os"
    	"time"
    
    	"github.com/eclipse/paho.mqtt.golang"
    )
    
    type SensorData struct {
    	Id          string  `json:"id"`
    	Temperature float64 `json:"temperature"`
    	Humidity    float64 `json:"humidity"`
    	Timestamp   int64   `json:"timestamp"`
    	City        string  `json:"city"`
    }
    
    func StartMQTTPublisher() {
    	fmt.Println("MQTT publisher Started")
    	mqtt.DEBUG = log.New(os.Stdout, "", 0)
    	mqtt.ERROR = log.New(os.Stdout, "", 0)
    	opts := mqtt.NewClientOptions().AddBroker(config.GetMqttServerurl()).SetClientID("MqttPublisherClient")
    	opts.SetKeepAlive(2 * time.Second)
    	opts.SetPingTimeout(1 * time.Second)
    	c := mqtt.NewClient(opts)
    	if token := c.Connect(); token.Wait() && token.Error() != nil {
    		panic(token.Error())
    	}
    
    	go func() {
    		t := 20.04
    		h := 32.06
    		for i := 0; i < 100; i++ {
    			sensordata := SensorData{
    				Id:          "CITIMUM",
    				Temperature: t,
    				Humidity:    h,
    				Timestamp:   time.Now().Unix(),
    				City:        "Mumbai",
    			}
    			requestBody, err := json.Marshal(sensordata)
    			if err != nil {
    				fmt.Println(err)
    			}
    			token := c.Publish(config.GetMQTTTopicName(), 0, false, requestBody)
    			token.Wait()
    			if i < 50 {
    				t = t + 1*rand.Float64()
    				h = h + 1*rand.Float64()
    			} else {
    				t = t - 1*rand.Float64()
    				h = h - 1*rand.Float64()
    			}
    			time.Sleep(5 * time.Second)
    		}
    	}()
    	go func() {
    		t := 16.02
    		h := 24.04
    		for i := 0; i < 100; i++ {
    			sensordata := SensorData{
    				Id:          "CITIPUN",
    				Temperature: t,
    				Humidity:    h,
    				Timestamp:   time.Now().Unix(),
    				City:        "Pune",
    			}
    			requestBody, err := json.Marshal(sensordata)
    			if err != nil {
    				fmt.Println(err)
    			}
    			token := c.Publish(config.GetMQTTTopicName(), 0, false, requestBody)
    			token.Wait()
    			if i < 50 {
    				t = t + 1*rand.Float64()
    				h = h + 1*rand.Float64()
    			} else {
    				t = t - 1*rand.Float64()
    				h = h - 1*rand.Float64()
    			}
    			time.Sleep(5 * time.Second)
    		}
    	}()
    	time.Sleep(1000 * time.Second)
    	c.Disconnect(250)
    
    }

    Create a Kinesis Producer

    Now we will create a Kinesis producer which subscribes to the topic to which our MQTT client writes data and pull the data from the broker and pushes it to the Kinesis stream. Just like in the previous section here also we first create an MQTT client which connects to the message broker and subscribe to the topic to which our clients/publishers are going to write data to. In the client option, we have the option to define a function which will be called when data is written to this topic. We have created a function postDataTokinesisStream() which connects Kinesis using the Kinesis client and then writes data to the Kinesis stream, every time a data is pushed to the topic.

    package producer
    
    import (
    	"config"
    	"fmt"
    
    	"os"
    	"time"
    
    	"github.com/aws/aws-sdk-go/service/kinesis"
    
    	mqtt "github.com/eclipse/paho.mqtt.golang"
    )
    
    func postDataTokinesisStream(client mqtt.Client, message mqtt.Message) {
    	fmt.Printf("Received message on topic: %snMessage: %sn", message.Topic(), message.Payload())
    	streamName := config.GetKinesisStreamName()
    	kclient := config.GetKinesisClient()
    	var putRecordInput kinesis.PutRecordInput
    	partitionKey := message.Topic()
    	putRecordInput.PartitionKey = &partitionKey
    	putRecordInput.StreamName = &streamName
    	putRecordInput.Data = message.Payload()
    	putRecordOutput, err := kclient.PutRecord(&putRecordInput)
    	if err != nil {
    		fmt.Println(err)
    	} else {
    		fmt.Println(putRecordOutput)
    	}
    
    }
    
    func StartKinesisProducer() {
    	fmt.Println("Kinesis Producer Started")
    	c := make(chan os.Signal, 1)
    	opts := mqtt.NewClientOptions().AddBroker(config.GetMqttServerurl()).SetClientID("MqttSubscriberClient")
    	opts.SetKeepAlive(2 * time.Second)
    	opts.SetPingTimeout(1 * time.Second)
    	opts.OnConnect = func(c mqtt.Client) {
    		if token := c.Subscribe(config.GetMQTTTopicName(), 0, postDataTokinesisStream); token.Wait() && token.Error() != nil {
    			panic(token.Error())
    		}
    	}
    
    	client := mqtt.NewClient(opts)
    	if token := client.Connect(); token.Wait() && token.Error() != nil {
    		panic(token.Error())
    	} else {
    		fmt.Printf("Connected to %sn", config.GetMqttServerurl())
    	}
    
    	<-c
    }

    Create a Kinesis Consumer

    Now the data is available in our Kinesis stream we can pull it for processing. In the Kinesis consumer section, we create a Kinesis client just like we did in the previous section and then pull data from it. Here we first make a call to the DescribeStream method which returns us the shardId, we then use this shardId to get the ShardIterator and then finally we are able to fetch the records by passing the ShardIterator to GetRecords() method. GetRecords() also returns the  NextShardIterator which we can use to continuously look for records in the shard until NextShardIterator becomes null.

    package consumer
    
    import (
    	"config"
    	"fmt"
    
    	"github.com/aws/aws-sdk-go/service/kinesis"
    	"velotio.com/dao"
    )
    
    func StartKinesisConsumer() {
    	fmt.Println("Kinesis Consumer Started")
    	client := config.GetKinesisClient()
    	streamName := config.GetKinesisStreamName()
    	var describeStreamInput kinesis.DescribeStreamInput
    	describeStreamInput.StreamName = &streamName
    	describeStreamOutput, err := client.DescribeStream(&describeStreamInput)
    	if err != nil {
    		fmt.Println(err)
    	} else {
    		fmt.Println(*describeStreamOutput.StreamDescription.Shards[0].ShardId)
    	}
    	var getShardIteratorInput kinesis.GetShardIteratorInput
    	getShardIteratorInput.ShardId = describeStreamOutput.StreamDescription.Shards[0].ShardId
    	getShardIteratorInput.StreamName = &streamName
    	shardIteratorType := "TRIM_HORIZON"
    	getShardIteratorInput.ShardIteratorType = &shardIteratorType
    	getShardIteratorOuput, err := client.GetShardIterator(&getShardIteratorInput)
    	if err != nil {
    		fmt.Println(err)
    	} else {
    		fmt.Println(*getShardIteratorOuput.ShardIterator)
    	}
    	var getRecordsInput kinesis.GetRecordsInput
    
    	getRecordsInput.ShardIterator = getShardIteratorOuput.ShardIterator
    	getRecordsOuput, err := client.GetRecords(&getRecordsInput)
    	//fmt.Println(getRecordsOuput)
    	if err != nil {
    		fmt.Println(err)
    	} else {
    		for *getRecordsOuput.NextShardIterator != "" {
    			i := 0
    			for i < len(getRecordsOuput.Records) {
    				//fmt.Println(len(getRecordsOuput.Records))
    				sdf := &dao.SensorDataFiltered{}
    				sdf.PostDataToInfluxDB(getRecordsOuput.Records[i].Data)
    				i++
    			}
    			getRecordsInput.ShardIterator = getRecordsOuput.NextShardIterator
    			getRecordsOuput, err = client.GetRecords(&getRecordsInput)
    		}
    
    	}
    }

    Processing the data and writing it to InfluxDB

    Now we do simple processing of filtering out data. The data that we got from the sensor is having fields sensorId, temperature, humidity, city, and timestamp but we are interested in only the values of temperature and humidity for a city so we have created a new structure ‘SensorDataFiltered’ which contains only the fields we need.

    For every record that the Kinesis consumer receives it creates an instance of the SensorDataFiltered type and calls the PostDataToInfluxDB() method where the record received from the Kinesis stream is unmarshaled into the SensorDataFiltered type and send to InfluxDB. Here we need to provide the name of the database we created earlier to the variable dbName and the InfluxDB host and port values to dbHost and dbPort respectively.

    In the InfluxDB request body, the first value that we provide is used as the measurement which is an InfluxDB struct to store similar data together. Then we have tags, we have used `city` as our tag so that we can filter the data based on them and then we have the actual values. For more details on InfluxDB data write format please refer here.

    package dao
    
    import (
    	"bytes"
    	"crypto/tls"
    	"encoding/json"
    	"fmt"
    	"net/http"
    )
    
    type SensorDataFiltered struct {
    	Temperature float64 `json:"temperature"`
    	Humidity    float64 `json:"humidity"`
    	City        string  `json:"city"`
    }
    
    var dbName = "iotdata"
    var dbHost = "184.73.62.30"
    var dbPort = "8086"
    
    func (sdf *SensorDataFiltered) PostDataToInfluxDB(Data []byte) {
    	err := json.Unmarshal(Data, &sdf)
    	if err != nil {
    		fmt.Println(err)
    	} else {
    		fmt.Println(sdf.Temperature, sdf.Humidity)
    	}
    	url := "http://" + dbHost + ":" + dbPort + "/write?db=" + dbName
    	humidity := fmt.Sprintf("%.2f", sdf.Humidity)
    	temperature := fmt.Sprintf("%.2f", sdf.Temperature)
    	city := sdf.City
    	requestBody := "sensordata,city=" + city + " humidity=" + humidity + ",temperature=" + temperature
    	req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(requestBody)))
    	httpclient := &http.Client{
    		Transport: &http.Transport{
    			TLSClientConfig: &tls.Config{
    				InsecureSkipVerify: true,
    			},
    		},
    	}
    	resp, err := httpclient.Do(req)
    	if err != nil {
    		fmt.Println(err)
    	} else {
    		fmt.Println("Status code for influxdb data port request = ", resp.StatusCode)
    	}
    	defer resp.Body.Close()
    
    }

    Once the data is written to InfluxDB we can see it in the web console by querying the measurement create in our database.

    Putting everything together in our main function

    Now we need to simply call the functions we discussed above and run our main program. Note that we have used `go` before the first two function call which makes them goroutines and they execute concurrently.

    On running the code you will see the logs for all the stages of our pipeline getting written to the stdout and it very closely resembles real-life scenarios where data is written by IoT devices and gets processed in near real-time.

    package main
    
    import (
    	"time"
    
    	"velotio.com/consumer"
    	"velotio.com/producer"
    	"velotio.com/publisher"
    )
    
    func main() {
    
    	go producer.StartKinesisProducer()
    	go publisher.StartMQTTPublisher()
    	time.Sleep(5 * time.Second)
    	consumer.StartKinesisConsumer()
    
    }

    Visualization through Grafana

    We can access the Grafana web console at port 3000 of the machine on which it is running. First, we need to add our InfluxDB as a data source to it under the data sources option.

    For creating dashboard go to the dashboard option and choose new. Once the dashboard is created we can start by adding a panel.

    We need to add Influxdb data source that we added earlier as the panel data source and write queries as shown in the image below.

    We can repeat the same process for adding another panel to the dashboard this time choosing a different city in our query.

    Conclusion:

    IoT data analytics is a fast evolving and interesting space. The number of IoT devices are growing rapidly. There is a great opportunity to get valuable insights from the huge amount of data generated by these device. In this blog, I tried to help you grab that opportunity by building a near real time data pipeline for IoT data. If you like it please share and subscribe to our blog.

  • A Step Towards Simplified Querying in NodeJS

    Recently, I came across a question on StackOverflow regarding querying the data on relationship table using sequalize and I went into flashback with the same situation and hence decided to write a blog over a better alternative Objection.js. When we choose ORM’s without looking into the use case we are tackling we usually end up with a mess.

    The question on StackOverflow was about converting the below query into sequalize query.

    SELECT a.* 
    FROM employees a, emp_dept_details b 
    WHERE b.Dept_Id=2 AND a.Emp_No = b.Emp_Id

    (Pardon me for naming in the query, it was asked by novice programmer and I wanted to keep it as it is for purity sake).

    Seems pretty straightforward right? So the solution is like below:

    Employee.findAll({ 
      include: [{ 
        model: EmployeeDeptDetails, 
        where: { 
          Emp_Id: Sequelize.col('employees.Emp_No'), 
          Dept_Id: 2 
        } 
      }] 
    });

    If you look at this it’s much complex solution for simple querying and this grows with added relationships. And also for simple queries like this, the sequalize documentation is not sufficient. Now if you ask me how it can be done in a better way with Objection.js below is the same query in objection.

    Employee.query()
      .joinRelation(‘employeeDeptDetails’)
      .where({ ‘employeeDeptDetails.Dept_Id’: 2 })

    Note: It’s assumed that relationship is defined (in model classes) in both examples.

    Now you guys can see the difference this is just one example I came across there are others on the internet for better understanding. So are you guys ready for diving into Objection.js?

    But before we dive in, I wanted to let you guys know whenever we check online for Node.js ORM, we always find some people saying “don’t use an ORM, just write plain SQL” and they are correct in their perception. If your app is small enough that you can write a bunch of query helper functions and carry out all the needed functionality, then don’t go with ORM approach, instead just use plain SQL.

    But when your app has an ample amount of tables and relationships between them that need to be defined and multiple-joint queries need to done, there comes the power of ORM.

    So when we search for the ORM’s (For relational DB) available in NodeJS arena we usually get the list below:

    1. Sequelize

    2. Objection.js

    3. typeORM

    There are others, I have just mentioned more popular ones.

    Well, I have personally used both Sequelize and Objection.js as they are the most popular ORM available today. So if you are a person who is deciding on which ORM you should be using for your next project or got frustrated with the relationship query complexity of `Sequelize` then you have landed on the correct place.

    I am going to be honest here, I am using Objection.js currently doesn’t make it the facto or best ORM for NodeJS. If you don’t love to write the SQL resembling queries and prefer the fully abstracted query syntax then I think `Sequelize` is the right option for you (though you might struggle with relationship queries as I did and land up with Objection.js later on) but if you want your queries to resemble the SQL one then you should read out this blog.

    What Makes Objection So Special?

    1. Objection under the hood uses KNEX.JS a   powerful SQL query builder

    2. Let’s you create models for tables with ES6 / ES7 classes and define the relationships between them

    3. Make queries with async / await

    4. Add validation to your models using JSON schema

    5. Perform graph inserts and upserts

    to name a few.

    The Learning Curve

    I have exclusively relied upon the documentation. The Knex.js and objection.js documentation is great and there are simple (One of them, I am going to use below for explanation) examples on the Objection GitHub. So if you have previously worked with any NodeJS ORM or you are a newbie, this will help you get started without any struggles.

    So let’s get started with some of the important topics while I explain to you the advantages over other ORM and usage along the way.

    For setup (package installation, configuration, etc.) and full code you can check out Github

    Creating and Managing DB Schema

    Migration is a good pattern to manage your changes database schema. Objection.js uses knex.js migration for this purpose.

    So what is Migration : Migrations are changes to a database’s schema specified within your ORM, so we will be defining the tables and columns of our database straight in JavaScript rather than using SQL.

    One of the best features of Knex is its robust migration support. To create a new migration simply use the knex cli:

    knex mirate:make migration_name

    After running this command you’ll notice that a new file is created within your migrations directory. This file will include a current timestamp as well as the name that you gave to your migration. The file will look like this:

    exports.up = function(knex, Promise) {
    
    };
    
    exports.down = function(knex, Promise) {
    
    };

    As you can notice the first is `exports.up`, which specifies the commands that should be run to make the database change that you’d like to make.e.g creating database tables, adding or removing a column from a table, changing indexes, etc.

    The second function within your migration file is `exports.down`. This functions goal is to do the opposite of what exports.up did. If `exports.up` created a table, then `exports.down` will drop that table. The reason to include `exports.down` is so that you can quickly undo a migration should you need to.

    For example:

    exports.up = knex => {
      return knex.schema
        .createTable('persons', table => {
          table.increments('id').primary();
          table
    	.integer('parentId')
    	.unsigned()
    	.references('id')
    	.inTable('persons')
    	.onDelete('SET NULL')
    	.index();
          table.string('firstName');
          table.string('lastName');
          table.integer('age');
          table.json('address');
        })
    };
    
    exports.down = knex => {
      return knex.schema
        .dropTableIfExists('persons');
    }; 

    It’s that simple to create the migration. Now you can run your migration like below.

    $ knex migrate:latest

    You can also pass the `–env` flag or set `NODE_ENV` to select an alternative environment:

    $ knex migrate:latest --env production

    To rollback the last batch of migrations:

    $ knex migrate:rollback

    Models

    Models are wrappers around the database tables, they help to encapsulate the business logic within those tables.

    Objection.js allows to create model using ES classes.

    Before diving into the example you guys need to clear your thoughts regarding model little bit as Objection.js Model does not create any table in DB. Yes! the only thing Models are used for are adding the validations and relationship mapping.  

    For example:

    const { Model } = require('objection');
    const Animal = require('./Animal');
    
    class Person extends Model {
      // Table name is the only required property.
      static get tableName() {
        return 'persons';
      }
    
      // Optional JSON schema. This is not the database schema. Nothing is generated
      // based on this. This is only used for validation. Whenever a model instance
      // is created it is checked against this schema. http://json-schema.org/.
      static get jsonSchema() {
        return {
          type: 'object',
          required: ['firstName', 'lastName'],
    
          properties: {
    	id: { type: 'integer' },
    	parentId: { type: ['integer', 'null'] },
    	firstName: { type: 'string', minLength: 1, maxLength: 255 },
    	lastName: { type: 'string', minLength: 1, maxLength: 255 },
    	age: { type: 'number' },
    	address: {
    	  type: 'object',
    	  properties: {
    	    street: { type: 'string' },
    	    city: { type: 'string' },
    	    zipCode: { type: 'string' }
    	  }
    	}
          }
        };
      }
    
      // This object defines the relations to other models.
      static get relationMappings() {
        return {
          pets: {
    	relation: Model.HasManyRelation,
    	// The related model. This can be either a Model subclass constructor or an
    	// absolute file path to a module that exports one.
    	modelClass: Animal,
    	join: {
    	  from: 'persons.id',
    	  to: 'animals.ownerId'
    	}
          }
        };	
      }
    }
    
    module.exports = Person;

    • Now let’s break it down, that static getter `tableName` return the table name.
    • We also have a second static getter method that defines the validations of each field and this is an optional thing to do. We can specify the required properties, type of the field i.e. number, string, object, etc and other validations as you can see in the example.
    • Third static getter function we see is `relationMappings` which defines this models relationship to other models. In this case, the key of the outside object `pets` is how we will refer to the child class. The join property in addition to the relation type defines how the models are related to one another. The from and to properties of the join object define the database columns through which the models are associated. The modelClass passed to the relation mappings is the class of the related model.

    So here `Person` has `HasManyRelation` with `Animal` model class and join is performed on persons `id` column and Animals `ownerId` column. So one person can have multiple pets.

    Queries

    Let’s start with simple SELECT queries:

    SELECT * FROM persons;

    Can be done like:

    const persons = await Person.query();

    Little advanced or should I say typical select query:

    SELECT * FROM persons where firstName = 'Ben' ORDER BY age;

    Can be done like:

    const persons = await Person.query()
      .where({ firstName: 'Ben' })
      .orderBy('age');

    So we can look how much objection queries resemble to the actual SQL queries so it’s always easy to transform SQL query easily into Objection.js one which is quite difficult with other ORMs.

    INSERT Queries:

    INSERT INTO persons (firstName) VALUES ('Ben');

    Can be done like:

    await Person.query().insert({ firstName: 'Ben' });

    UPDATE Queries:

    UPDATE persons set firstName = 'Brayn' where id = 1;

    Can be done like:

    await Person.query().patch({ firstName: 'Brayn' }).where({ id: 1 });

    DELETE Queries:

    DELETE from persons where id = 1;

    Can be done like:

    await Person.query().delete().where({ id: 1 });

    Relationship Queries:

    Suppose we want to fetch all the pets of Person whose first name is Ben.

    const pets = await person
      .$relatedQuery('pets')
      .where('name', 'Ben');

    Now suppose you want to insert person along with his pets. In this case we can use the graph queries.

    const personWithPets = {
      firstName: 'Matt',
      lastName: 'Damon',
      age: 43,
    
        pets: [
        {
          name: 'Doggo',
          species: 'dog'
        },
        {
          name: 'Kat',
          species: 'cat'
        }
      ]
    };
    
    // wrap `insertGraph` call in a transaction since its creating multiple queries.
    const insertedGraph = await transaction(Person.knex(), trx => {
      return (
        Person.query(trx).insertGraph(personWithPets)
      );
    });

    So here we can see the power of Objection queries and if try to compare these queries with other ORM queries you will find out the difference yourself which is better.

    Plugin Availability

    objection-password: This plugin automatically adds automatic password hashing to your Objection.js models. This makes it super-easy to secure passwords and other sensitive data.

    objection-graphql: Automatic GraphQL API generator for objection.js models.

    Verdict

    I am having fun time working with Objection and Knex currently! If you ask me to choose between sequalize and objection.js I would definitely go with objection.js to avoid all the relationship queries pain. It’s worth noting that Objection.js is unlike your other ORM’s, it’s just a wrapper over the KNEX.js query builder so its like using query builder with additional features.

  • Your Quintessential Guide to AWS Athena

    Introduction

    Serverless has become a new trend today and is here to stay for sure! Now when you think of wireless internet, you know that it still has some wires but you don’t need to worry about them as you don’t have to maintain them. Similarly, serverless has servers but you don’t have to keep worrying about handling or maintaining them. All you need to do is focus on your code and you’re good to go.

    It has some more benefits, such as:

    • Zero administration: You can deploy code without provisioning anything beforehand, or managing anything later. There is no concept of a fleet, an instance, or even an operating system.
    • Auto-scaling: It lets your service providers manage the scaling challenges. You don’t need to fire alerts or write scripts to scale up and down. It handles quick bursts of traffic and weekend lulls the same way.
    • Pay-per-use: The function-as-a-service compute and managed services are charged based on usage rather than pre-provisioned capacity. You can have complete resource utilization without paying a cent for idle time. The results? 90% cost-savings over a cloud VM, and the satisfaction of knowing that you never pay for resources you don’t use.

    What is AWS Athena?

    AWS Athena is a similar serverless service. It is more of an interactive query service than a code deployment service.

    Using Athena one can directly query the data stored in S3 buckets and using standard ANSI SQL.

    As mentioned earlier, it works on the principle of serverless, that is, there is no infrastructure to manage, and you pay only for the queries that you run.

    Athena is easy to use. You can simply point to your data in Amazon S3, define the schema, and start querying using standard SQL. Most results are delivered within seconds. With Athena, there’s no need for complex ETL jobs to prepare your data for analysis. This makes it easy for anyone with SQL skills to quickly analyze large-scale datasets.

    It is based on Facebook’s PrestoDB and can be used to query structured and semi-structured data.

    Some Exciting Features of Athena are:

    • Serverless. No ETL – Not having to set up and manage any servers or data warehouses.
    • Only pay for the data that is scanned.
    • You can ensure better performance by compressing, partitioning, and converting your data into columnar formats.
    • Can also handle complex analysis, including large joins, window functions, and arrays.
    • Athena automatically executes queries in parallel.
    • Need to provide a path to the S3 folder and when new files added automatically reflects in the table.
    • Supports –
    • Support CSV, Json, Parquet, ORC, Avro data formats
    • Complex Joins and datatypes
    • View creation
    • Does not Support –
    • User-defined functions and stored procedures
    • Hive or Presto transactions
    • LZO (Snappy is supported)

    Pricing of Athena

    • AWS Athena is priced $5 for each TB of data scanned.
    • Queries are rounded up to the nearest MB, with a 10 MB minimum.
    • Users pay for stored data at regular S3 rates.
    • Amazon advises users to use compressed data files, have data in columnar formats, and routinely delete old results sets to keep charges low. Partitioning data in tables can speed up queries and reduce query bills.

    Athena vs. Redshift Spectrum

    • AWS also has Redshift as data warehouse service, and we can use redshift spectrum to query S3 data, so then why should you use Athena?

    Advantages of Redshift Spectrum:

    • Allows creation of Redshift tables. You’re able to join Redshift tables with Redshift spectrum tables efficiently.

    If you do not need those things then you should consider Athena as well Athena differences from Redshift spectrum:

    • Billing. This is a major difference and depending on your use case you may find one much cheaper than the other Performance.
    • Athena slightly faster. SQL syntax and features.
    • Athena is derived from presto and is a bit different to Redshift which has its roots in Postgres.
    • It’s easy enough to connect to Athena using API, JDBC or ODBC but many more products offer “standard out of the box” connection to Redshift.
    • Athena has GIS functions and lambdas.

    So in nutshell, if you have existing instances of redshift you would probably go for Redshift Spectrum, if not then you can opt for Athena for querying the data. In some cases, you can use both in tandem.

    Example

    Here is a sample query to create a sample database having 3 tables basic_details, contact_details and bill_details, Uploaded csv file to s3:

    Basic_details:

    const outside = {weather: FRIGHTFUL}
    const inside = {fire: DELIGHTFUL}
    const go = places => places.some(p=>p>outside.weather)))
    
    const snow = () => (outside.weather < inside.fire && !go(places)) {
      let it = snow()
    }
    
    let it = snow()
    
    const FRIGHTFUL = 1
    const DELIGHTFUL = 1337

    Bill_details:

    CREATE EXTERNAL TABLE `bil_details`(
      `id` int COMMENT '', 
      `amount_paid` string COMMENT '', 
      `amount_due` string COMMENT '')
    ROW FORMAT DELIMITED 
      FIELDS TERMINATED BY ',' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
      's3://athena-blog/bill-details'
    TBLPROPERTIES (
      'has_encrypted_data'='false', 
      'skip.header.line.count'='1')

    Contact_details:

    CREATE EXTERNAL TABLE `contact_details`(
      `id` int COMMENT '', 
      `street` string COMMENT '', 
      `city` string COMMENT '', 
      `state` string COMMENT '', 
      `country` string COMMENT '', 
      `zip` string COMMENT '')
    ROW FORMAT DELIMITED 
      FIELDS TERMINATED BY ',' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
      's3://athena-blog/contact-details'
    TBLPROPERTIES (
      'has_encrypted_data'='false', 
      'skip.header.line.count'='1')

    Sample Query for – FirstNames of People from Minnesota with amount_due > $100

    WITH basic AS 
        (SELECT id,
             first_name
        FROM basic_details
        WHERE lower(gender) = 'male' ), bill AS 
        (SELECT id
        FROM bil_details
        WHERE CAST(amount_due AS INTEGER) > 100 ), contact AS 
        (SELECT contact_details.id
        FROM contact_details
        JOIN bill
            ON contact_details.id = bill.id
        WHERE state= 'Minnesota' )
    SELECT basic.first_name
    FROM basic
    JOIN contact
        ON basic.id = contact.id 

    Output:

    Some Other Sample Queries:

    1. Searching for Values in JSON

    WITH dataset AS (
      SELECT * FROM (VALUES
        (JSON '{"name": "Bob Smith", "org": "legal", "projects": ["project1"]}'),
        (JSON '{"name": "Susan Smith", "org": "engineering", "projects": ["project1", "project2", "project3"]}'),
        (JSON '{"name": "Jane Smith", "org": "finance", "projects": ["project1", "project2"]}')
      ) AS t (users)
    )
    SELECT json_extract_scalar(users, '$.name') AS user
    FROM dataset
    WHERE json_array_contains(json_extract(users, '$.projects'), 'project2')

    Output:

    2. Extracting properties

    WITH dataset AS (
      SELECT '{"name": "Susan Smith",
               "org": "engineering",
               "projects": [{"name":"project1", "completed":false},
               {"name":"project2", "completed":true}]}'
        AS blob
    )
    SELECT
      json_extract(blob, '$.name') AS name,
      json_extract(blob, '$.projects') AS projects
    FROM dataset

    Output:

    3. Converting JSON to Athena Data Types

    WITH dataset AS (
      SELECT
        CAST(JSON '"HELLO ATHENA"' AS VARCHAR) AS hello_msg,
        CAST(JSON '12345' AS INTEGER) AS some_int,
        CAST(JSON '{"a":1,"b":2}' AS MAP(VARCHAR, INTEGER)) AS some_map
    )
    SELECT * FROM dataset

    Output:

    Conclusion

    Hence, we can easily say that AWS Athena gives us an efficient way to query our raw data present in different formats in S3 object storage, without spawning a dedicated infrastructure and at minimal cost.

    Need help with setting up AWS Athena for your organization? Connect with the experts at Velotio!

  • Amazon Lex + AWS Lambda: Beyond Hello World

    In my previous blog, I explained how to get started with Amazon Lex and build simple bots. This blog aims at exploring the Lambda functions used by Amazon Lex for code validation and fulfillment. We will go along with the same example we created in our first blog i.e. purchasing a book and will see in details how the dots are connected.

    This blog is divided into following sections:

    1. Lambda function input format
    2. Response format
    3. Managing conversation context
    4. An example (demonstration to understand better how context is maintained to make data flow between two different intents)

    NOTE: Input to a Lambda function will change according to the language you use to create the function. Since we have used NodeJS for our example, everything will thus be explained using it.

    Section 1:  Lambda function input format

    When communication is started with a Bot, Amazon Lex passes control to Lambda function, we have defined while creating the bot.

    There are three arguments that Amazon Lex passes to a Lambda function:

    1. Event:

    event is a JSON variable containing all details regarding a bot conversation. Every time lambda function is invoked, event JSON is sent by Amazon Lex which contains the details of the respective message sent by the user to the bot.

    Below is a sample event JSON:

    {  
    currentIntent: {    
    name: 'orderBook',    
    slots: {      
      bookType: null,      
      bookName: 'null'    
     },    
      confirmationStatus: 'None'
     },
     bot: {  
      name: 'PurchaseBook',  
      alias: '$LATEST',  
      version: '$LATEST'
     },
     userId: 'user-1',
     inputTranscript: 'buy me a book',
     invocationSource: 'DialogCodeHook',
     outputDialogMode: 'Text',
     messageVersion: '1.0'
     };

    Format of event JSON is explained below:-

    • currentIntent:  It will contain information regarding the intent of message sent by the user to the bot. It contains following keys:
    • name: intent name  (for e.g orderBook, we defined this intent in our previous blog).
    • slots: It will contain a map of slot names configured for that particular intent,  populated with values recognized by Amazon Lex during the conversation. Default values are null.  
    • confirmationStatus:  It provides the user response to a confirmation prompt if there is one. Possible values for this variable are:
    • None: Default value
    • Confirmed: When the user responds with a confirmation w.r.t confirmation prompt.
    • Denied: When the user responds with a deny w.r.t confirmation prompt.
    • inputTranscipt: Text input by the user for processing. In case of audio input, the text will be extracted from audio. This is the text that is actually processed to recognize intents and slot values.                                
    • invocationSource: Its value directs the reason for invoking the Lambda function. It can have following two values:
    • DialogCodeHook:  This value directs the Lambda function to initialize the validation of user’s data input. If the intent is not clear, Amazon Lex can’t invoke the Lambda function.
    • FulfillmentCodeHook: This value is set to fulfil the intent. If the intent is configured to invoke a Lambda function as a fulfilment code hook, Amazon Lex sets the invocationSource to this value only after it has all the slot data to fulfil the intent.
    • bot: Details of bot that processed the request. It consists of below information:
    • name:  name of the bot.
    • alias: alias of the bot version.
    • version: the version of the bot.
    • userId: Its value is defined by the client application. Amazon Lex passes it to the Lambda function.
    • outputDialogMode:  Its value depends on how you have configured your bot. Its value can be Text / Voice.
    • messageVersion: The version of the message that identifies the format of the event data going into the Lambda function and the expected format of the response from a Lambda function. In the current implementation, only message version 1.0 is supported. Therefore, the console assumes the default value of 1.0 and doesn’t show the message version.
    • sessionAttributes:  Application-specific session attributes that the client sent in the request. It is optional.

    2. Context:

    AWS Lambda uses this parameter to provide the runtime information of the Lambda function that is executing. Some useful information we can get from context object are:-

    • The time is remaining before AWS Lambda terminates the Lambda function.
    • The CloudWatch log stream associated with the Lambda function that is executing.
    • The AWS request ID returned to the client that invoked the Lambda function which can be used for any follow-up inquiry with AWS support.

    Section 2: Response Format

    Amazon Lex expects a response from a Lambda function in the following format:

    {  
    sessionAttributes: {},  
    dialogAction: {   
    type: "ElicitIntent/ ElicitSlot/ ConfirmIntent/ Delegate/ Close",
    <structure based on type> 
    }
    }

    The response consists of two fields. The sessionAttributes field is optional, the dialogAction field is required. The contents of the dialogAction field depends on the value of the type field.

    • sessionAttributes: This is an optional field, it can be empty. If the function has to send something back to the client it should be passed under sessionAttributes. We will see its use-case in Section-4.
    • dialogAction (Required): Type of this field defines the next course of action. There are five types of dialogAction explained below:-

    1) Close: Informs Amazon Lex not to expect a response from the user. This is the case when all slots get filled. If you don’t specify a message, Amazon Lex uses the goodbye message or the follow-up message configured for the intent.

    dialogAction: {   
    type: "Close",   
    fulfillmentState: "Fulfilled/ Failed", // (required)   
    message: { // (optional)     
    contentType: "PlainText or SSML",     
    content: "Message to convey to the user"   
    } 
    }

    2) ConfirmIntent: Informs Amazon Lex that the user is expected to give a yes or no answer to confirm or deny the current intent. The slots field must contain an entry for each of the slots configured for the specified intent. If the value of a slot is unknown, you must set it to null. The message and responseCard fields are optional.

    dialogAction: {   
    type: "ConfirmIntent",   
    intentName: "orderBook",   
    slots: {     
      bookName: "value",     
      bookType: "value",   
     }   
     message: { // (optional)     
      contentType: "PlainText or SSML",     
      content: "Message to convey to the user"   
      } 
      }

    3) Delegate:  Directs Amazon Lex to choose the next course of action based on the bot configuration. The response must include any session attributes, and the slots field must include all of the slots specified for the requested intent. If the value of the field is unknown, you must set it to null. You will get a DependencyFailedException exception if your fulfilment function returns the Delegate dialog action without removing any slots.

    dialogAction: {   
    type: "Delegate",   
    slots: {     
      slot1: "value",     
      slot2: "value"   
     } 
     }

    4) ElicitIntent: Informs Amazon Lex that the user is expected to respond with an utterance that includes an intent. For example, “I want a buy a book” which indicates the OrderBook intent. The utterance “book,” on the other hand, is not sufficient for Amazon Lex to infer the user’s intent

    dialogAction: 
    {   type: "ElicitIntent",   
    message: { // (optional)     
    contentType: "PlainText or SSML",     
    content: "Message to convey to the user"   
    } 
    }

    5) ElicitSlot:  Informs Amazon Lex that the user is expected to provide a slot value in the response. In below structure, we are informing Amazon lex that user response should provide value for the slot named ‘bookName’.

    dialogAction: {   
      type: "ElicitSlot",   
      intentName: "orderBook",   
      slots: {     
        bookName: "",     
        bookType: "fiction",   
       },   
       slotToElicit: "bookName",   
       message: { // (optional)     
       contentType: "PlainText or SSML",     
       content: "Message to convey to the user"   
       }
       }

    Section 3: Managing Conversation Context

    Conversation context is the information that a user, your application, or a Lambda function provides to an Amazon Lex bot to fulfill an intent. Conversation context includes slot data that the user provides, request attributes set by the client application, and session attributes that the client application and Lambda functions create.

    1. Setting session timeout

    Session timeout is the length of time that a conversation session lasts. For in-progress conversations, Amazon Lex retains the context information, slot data, and session attributes till the session ends. Default session duration is 5 minutes but it can be changed upto 24 hrs while creating the bot in Amazon Lex console.

    2.Setting session attributes

    Session attributes contain application-specific information that is passed between a bot and a client application during a session. Amazon Lex passes session attributes to all Lambda functions configured for a bot. If a Lambda function adds or updates session attributes, Amazon Lex passes the new information back to the client application.

    Session attributes persist for the duration of the session. Amazon Lex stores them in an encrypted data store until the session ends.

    3. Sharing information between intents

    If you have created a bot with more than one intent, information can be shared between them using session attributes. Attributes defined while fulfilling an intent can be used in other defined intent.

    For example, a user of the book ordering bot starts by ordering books. the bot engages in a conversation with the user, gathering slot data, such as book name, and quantity. When the user places an order, the Lambda function that fulfils the order sets the lastConfirmedReservation session attribute containing information regarding ordered book and currentReservationPrice containing the price of the book. So, when the user has fulfilled the intent orderMagazine, the final price will be calculated on the bases of currentReservationPrice.

    lastConfirmedReservation session attribute containing information regarding ordered book and currentReservationPrice containing the price of the book. So, when the user also fulfilled the intent orderMagazine, the final price will be calculated on the basis of currentReservationPrice.

    Section 4:  Example

    The details of example Bot are below:

    Bot Name: PurchaseBot

    Intents :

    • orderBook – bookName, bookType
    • orderMagazine – magazineName, issueMonth

    Session attributes set while fulfilling the intent “orderBook” are:

    1. lastConfirmedReservation: In this variable, we are storing slot values corresponding to intent orderBook.
    2. currentReservationPrice: Book price is calculated and stored in this variable

    When intent orderBook gets fulfilled we will ask the user if he also wants to order a magazine. If the user responds with a confirmation bot will start fulfilling the intent “orderMagazine”.  

    Conclusion

    AWS Lambda functions are used as code hooks for your Amazon Lex bot. You can identify Lambda functions to perform initialization and validation, fulfillment, or both in your intent configuration. This blog bought more technical insight of how Amazon Lex works and how it communicates with Lambda functions. This blog explains how a conversation context is maintained using the session attributes. I hope you find the information useful.

  • Ensure Continuous Delivery On Kubernetes With GitOps’ Argo CD

    What is GitOps?

    GitOps is a Continuous Deployment model for cloud-native applications. In GitOps, the Git repositories which contain the declarative descriptions of the infrastructure are considered as the single source of truth for the desired state of the system and we need to have an automated way to ensure that the deployed state of the system always matches the state defined in the Git repository. All the changes (deployment/upgrade/rollback) on the environment are triggered by changes (commits) made on the Git repository

    The artifacts that we run on any environment always have a corresponding code for them on some Git repositories. Can we say the same thing for our infrastructure code?

    Infrastructure as code tools, completely declarative orchestration tools like Kubernetes allow us to represent the entire state of our system in a declarative way. GitOps intends to make use of this ability and make infrastructure-related operations more developer-centric.

    Role of Infrastructure as Code (IaC) in GitOps

    The ability to represent the infrastructure as code is at the core of GitOps. But just having versioned controlled infrastructure as code doesn’t mean GitOps, we also need to have a mechanism in place to keep (try to keep) our deployed state in sync with the state we define in the Git repository.

    Infrastructure as Code is necessary but not sufficient to achieve GitOps

    GitOps does pull-based deployments

    Most of the deployment pipelines we see currently, push the changes in the deployed environment. For example, consider that we need to upgrade our application to a newer version then we will update its docker image tag in some repository which will trigger a deployment pipeline and update the deployed application. Here the changes were pushed on the environment. In GitOps, we just need to update the image tag on the Git repository for that environment and the changes will be pulled to the environment to match the updated state in the Git repository. The magic of keeping the deployed state in sync with state-defined on Git is achieved with the help of operators/agents. The operator is like a control loop which can identify differences between the deployed state and the desired state and make sure they are the same.

    Key benefits of GitOps:

    1. All the changes are verifiable and auditable as they make their way into the system through Git repositories.
    2. Easy and consistent replication of the environment as Git repository is the single source of truth. This makes disaster recovery much quicker and simpler.
    3. More developer-centric experience for operating infrastructure. Also a smaller learning curve for deploying dev environments.
    4. Consistent rollback of application as well as infrastructure state.

    Introduction to Argo CD

    Argo CD is a continuous delivery tool that works on the principles of GitOps and is built specifically for Kubernetes. The product was developed and open-sourced by Intuit and is currently a part of CNCF.

    Key components of Argo CD:

    1. API Server: Just like K8s, Argo CD also has an API server that exposes APIs that other systems can interact with. The API server is responsible for managing the application, repository and cluster credentials, enforcing authentication and authorization, etc.
    2. Repository server: The repository server keeps a local cache of the Git repository, which holds the K8s manifest files for the application. This service is called by other services to get the K8s manifests.  
    3. Application controller: The application controller continuously watches the deployed state of the application and compares it with the desired state of the application, reports the API server whenever they are not in sync with each other and seldom takes corrective actions as well. It is also responsible for executing user-defined hooks for various lifecycle events of the application.

    Key objects/resources in Argo CD:

    1. Application: Argo CD allows us to represent the instance of the application which we want to deploy in an environment by creating Kubernetes objects of a custom resource definition(CRD) named Application. In the specification of Application type objects, we specify the source (repository) of our application’s K8s manifest files, the K8s server where we want to deploy those manifests, namespace, and other information.
    2. AppProject: Just like Application, Argo CD provides another CRD named AppProject. AppProjects are used to logically group related-applications.
    3. Repo Credentials: In the case of private repositories, we need to provide access credentials. For credentials, Argo CD uses the K8s secrets and config map. First, we create objects of secret types and then we update a special-purpose configuration map named argocd-cm with the repository URL and the secret which contains the credentials.
    4. Cluster Credentials: Along with Git repository credentials, we also need to provide the K8s cluster credentials. These credentials are also managed using K8s secret, we are required to add the label argocd.argoproj.io/secret-type: cluster to these secrets.

    Demo:

    Enough of theory, let’s try out the things we discussed above. For the demo, I have created a simple app named message-app. This app reads a message set in the environment variable named MESSAGE. We will populate the values of this environment variable using a K8s config map. I have kept the K8s manifest files for the app in a separate repository. We have the application and the K8s manifest files ready. Now we are all set to install Argo CD and deploy our application.

    Installing Argo CD:

    For installing Argo CD, we first need to create a namespace named argocd.

    kubectl create namespace argocd
    kubectl apply -n argocd -f https://raw.githubusercontent.com/argoproj/argo-cd/stable/manifests/install.yaml

    Applying the files from the argo-cd repo directly is fine for demo purposes, but in actual environments, you must copy the file in your repository before applying them. 

    We can see that this command has created the core components and CRDs we discussed earlier in the blog. There are some additional resources as well but we can ignore them for the time being.

    Accessing the Argo CD GUI

    We have the Argo CD running in our cluster, Argo CD also provides a GUI which gives us a graphical representation of our k8s objects. It allows us to view events, pod logs, and other configurations.

    By default, the GUI service is not exposed outside the cluster. Let us update its service type to LoadBalancer so that we can access it from outside.

    kubectl patch svc argocd-server -n argocd -p '{"spec": {"type": "LoadBalancer"}}'

    After this, we can use the external IP of the argocd-server service and access the GUI. 

    The initial username is admin and the password is the name of the api-server pod. The password can be obtained by listing the pods in the argocd namespace or directly by this command.

    kubectl get pods -n argocd -l app.kubernetes.io/name=argocd-server -o name | cut -d'/' -f 2 

    Deploy the app:

    Now let’s go ahead and create our application for the staging environment for our message app.

    apiVersion: argoproj.io/v1alpha1
    kind: Application
    metadata:
      name: message-app-staging
      namespace: argocd
      environment: staging
      finalizers:
        - resources-finalizer.argocd.argoproj.io
    spec:
      project: default
    
      # Source of the application manifests
      source:
        repoURL: https://github.com/akash-gautam/message-app-manifests.git
        targetRevision: HEAD
        path: manifests/staging
    
      # Destination cluster and namespace to deploy the application
      destination:
        server: https://kubernetes.default.svc
        namespace: staging
    
      syncPolicy:
        automated:
          prune: false
          selfHeal: false

    In the application spec, we have specified the repository, where our manifest files are stored and also the path of the files in the repository. 

    We want to deploy our app in the same k8s cluster where ArgoCD is running so we have specified the local k8s service URL in the destination. We want the resources to be deployed in the staging namespace, so we have set it accordingly.

    In the sync policy, we have enabled automated sync. We have kept the project as default. 

    Adding the resources-finalizer.argocd.argoproj.io ensures that all the resources created for the application are deleted when the Application is deleted. This is fine for our demo setup but might not always be desirable in real-life scenarios.

    Our git repos are public so we don’t need to create secrets for git repo credentials.

    We are deploying in the same cluster where Argo CD itself is running. As this is a demo setup, we can use the admin user created by Argo CD, so we don’t need to create secrets for cluster credentials either.

    Now let’s go ahead and create the application and see the magic happen.

    kubectl apply -f message-app-staging.yaml

    As soon as the application is created, we can see it on the GUI. 

    By clicking on the application, we can see all the Kubernetes objects created for it.

    It also shows the objects which are indirectly created by the objects we create. In the above image, we can see the replica set and endpoint object which are created as a result of creating the deployment and service respectively.

    We can also click on the individual objects and see their configuration. For pods, we can see events and logs as well.

    As our app is deployed now, we can grab public IP of message-app service and access it on the browser.

    We can see that our app is deployed and accessible.

    Updating the app

    For updating our application, all we need to do is commit our changes to the GitHub repository. We know the message-app just displays the message we pass to it via. Config map, so let’s update the message and push it to the repository.

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: message-configmap
      labels:
        app: message-app
    data:
      MESSAGE: "This too shall pass" #Put the message you want to display here.

    Once the commit is done, Argo CD will start to sync again.

    Once the sync is done, we will restart our message app pod, so that it picks up the latest values in the config map. Then we need to refresh the browser to see updated values.

    As we discussed earlier, for making any changes to the environment, we just need to update the repo which is being used as the source for the environment and then the changes will get pulled in the environment. 

    We can follow an exact similar approach and deploy the application to the production environment as well. We just need to create a new application object and set the manifest path and deployment namespace accordingly.

    Conclusion: 

    It’s still early days for GitOps, but it has already been successfully implemented at scale by many organizations. As the GitOps tools mature along with the ever-growing adoption of Kubernetes, I think many organizations will consider adopting GitOps soon. GitOps is not limited only to Kubernetes, but the completely declarative nature of Kubernetes makes it simpler to achieve GitOps. Argo CD is a deployment tool that’s tailored for Kubernetes and allows us to do deployments in a Kubernetes native way while following the principles of GitOps.I hope this blog helped you in understanding how what and why of GitOps and gave some insights to Argo CD.

  • Building a WebSocket Service with AWS Lambda & DynamoDB

    WebSocket is an effective way for full-duplex, real-time communication between a web server and a client. It is widely used for building real-time web applications along with helper libraries that offer better features. Implementing WebSockets requires a persistent connection between two parties. Serverless functions are known for short execution time and non-persistent behavior. However, with the API Gateway support for WebSocket endpoints, it is possible to implement a Serverless service built on AWS Lambda, API Gateway, and DynamoDB.

    Prerequisites

    A basic understanding of real-time web applications will help with this implementation. Throughout this article, we will be using Serverless Framework for developing and deploying the WebSocket service. Also, Node.js is used to write the business logic. 

    Behind the scenes, Serverless uses Cloudformation to create various required resources, like API Gateway APIs, AWS Lambda functions, IAM roles and policies, etc.

    Why Serverless?

    Serverless Framework abstracts the complex syntax needed for creating the Cloudformation stacks and helps us focus on the business logic of the services. Along with that, there are a variety of plugins available that help developing serverless applications easier.

    Why DynamoDB?

    We need persistent storage for WebSocket connection data, along with AWS Lambda. DynamoDB, a serverless key-value database from AWS, offers low latency, making it a great fit for storing and retrieving WebSocket connection details.

    Overview

    In this application, we’ll be creating an AWS Lambda service that accepts the WebSocket connections coming via API Gateway. The connections and subscriptions to topics are persisted using DynamoDB. We will be using ws for implementing basic WebSocket clients for the demonstration. The implementation has a Lambda consuming WebSocket that receives the connections and handles the communication. 

    Base Setup

    We will be using the default Node.js boilerplate offered by Serverless as a starting point.

    serverless create --template aws-nodejs

    A few of the Serverless plugins are installed and used to speed up the development and deployment of the Serverless stack. We also add the webpack config given here to support the latest JS syntax.

    Adding Lambda role and policies:

    The lambda function requires a role attached to it that has enough permissions to access DynamoDB and Execute API. These are the links for the configuration files:

    Link to dynamoDB.yaml

    Link to lambdaRole.yaml

    Adding custom config for plugins:

    The plugins used for local development must have the custom config added in the yaml file.

    This is how our serverless.yaml file should look like after the base serverless configuration:

    service: websocket-app
    frameworkVersion: '2'
    custom:
     dynamodb:
       stages:
         - dev
       start:
         port: 8000
         inMemory: true
         heapInitial: 200m
         heapMax: 1g
         migrate: true
         convertEmptyValues: true
     webpack:
       keepOutputDirectory: true
       packager: 'npm'
       includeModules:
         forceExclude:
           - aws-sdk
     
    provider:
     name: aws
     runtime: nodejs12.x
     lambdaHashingVersion: 20201221
    plugins:
     - serverless-dynamodb-local
     - serverless-plugin-existing-s3
     - serverless-dotenv-plugin
     - serverless-webpack
     - serverless-offline
    resources:
     - Resources: ${file(./config/dynamoDB.yaml)}
     - Resources: ${file(./config/lambdaRoles.yaml)}
    functions:
     hello:
       handler: handler.hello

    Add WebSocket Lambda:

    We need to create a lambda function that accepts WebSocket events from API Gateway. As you can see, we’ve defined 3 WebSocket events for the lambda function.

    • $connect
    • $disconnect
    • $default

    These 3 events stand for the default routes that come with WebSocket API Gateway offering. $connect and $disconnect are used for initialization and termination of the socket connection, where $default route is for data transfer.

    functions:
     websocket:
       handler: lambda/websocket.handler
       events:
         - websocket:
             route: $connect
         - websocket:
             route: $disconnect
         - websocket:
             route: $default

    We can go ahead and update how data is sent and add custom WebSocket routes to the application.

    The lambda needs to establish a connection with the client and handle the subscriptions. The logic for updating the DynamoDB is written in a utility class client. Whenever a connection is received, we create a record in the topics table.

    console.log(`Received socket connectionId: ${event.requestContext && event.requestContext.connectionId}`);
           if (!(event.requestContext && event.requestContext.connectionId)) {
               throw new Error('Invalid event. Missing `connectionId` parameter.');
           }
           const connectionId = event.requestContext.connectionId;
           const route = event.requestContext.routeKey;
           console.log(`data from ${connectionId} ${event.body}`);
           const connection = new Client(connectionId);
           const response = { statusCode: 200, body: '' };
     
           if (route === '$connect') {
               console.log(`Route ${route} - Socket connectionId connectedconected: ${event.requestContext && event.requestContext.connectionId}`);
               await new Client(connectionId).connect();
               return response;
           } 

    The Client utility class internally creates a record for the given connectionId in the DynamoDB topics table.

    async subscribe({ topic, ttl }) {
       return dynamoDBClient
         .put({ 
            Item: {
             topic,
             connectionId: this.connectionId,
            ttl: typeof ttl === 'number' ? ttl : Math.floor(Date.now() / 1000) + 60 * 60 * 2,
           },
           TableName: process.env.TOPICS_TABLE,
         }).promise();
     }

    Similarly, for the $disconnect route, we remove the INITIAL_CONNECTION topic record when a client disconnects.

    else if (route === '$disconnect') {
     console.log(`Route ${route} - Socket disconnected: ${ event.requestContext.connectionId}`);
               await new Client(connectionId).unsubscribe();
               return response;
           }

    The client.unsubscribe method internally removes the connection entry from the DynamoDB table. Here, the getTopics method fetches all the topics the particular client has subscribed to.

    async unsubscribe() {
       const topics = await this.getTopics();
       if (!topics) {
         throw Error(`Topics got undefined`);
       }
       return this.removeTopics({
         [process.env.TOPICS_TABLE]: topics.map(({ topic, connectionId }) => ({
           DeleteRequest: { Key: { topic, connectionId } },
         })),
       });
     }

    Now comes the default route part of the lambda where we customize message handling. In this implementation, we’re relaying our message handling based on the event.body.type, which indicates what kind of message is received from the client to server. The subscribe type here is used to subscribe to new topics. Similarly, the message type is used to receive the message from one client and then publish it to other clients who have subscribed to the same topic as the sender.

    console.log(`Route ${route} - data from ${connectionId}`);
               if (!event.body) {
                   return response;
               }
               let body = JSON.parse(event.body);
               const topic = body.topic;
               if (body.type === 'subscribe') {
                   connection.subscribe({ topic });
                   console.log(`Client subscribing for topic: ${topic}`);
               }
               if (body.type === 'message') {
                   await new Topic(topic).publishMessage({ data: body.message });
                   console.error(`Published messages to subscribers`);
                   return response;
               }
               return response;

    Similar to $connect, the subscribe type of payload, when received, creates a new subscription for the mentioned topic.

    Publishing the messages

    Here is the interesting part of this lambda. When a client sends a payload with type message, the lambda calls the publishMessage method with the data received. The method gets the subscribers active for the topic and publishes messages using another utility TopicSubscriber.sendMessage

    async publishMessage(data) {
       const subscribers = await this.getSubscribers();
       const promises = subscribers.map(async ({ connectionId, subscriptionId }) => {
         const TopicSubscriber = new Client(connectionId);
           const res = await TopicSubscriber.sendMessage({
             id: subscriptionId,
             payload: { data },
             type: 'data',
           });
           return res;
       });
       return Promise.all(promises);
     }

    The sendMessage executes the API endpoint, which is the API Gateway URL after deployment. As we’re using serverless-offline for the local development, the IS_OFFLINE env variable is automatically set.

    const endpoint =  process.env.IS_OFFLINE ? 'http://localhost:3001' : process.env.PUBLISH_ENDPOINT;
       console.log('publish endpoint', endpoint);
       const gatewayClient = new ApiGatewayManagementApi({
         apiVersion: '2018-11-29',
         credentials: config,
         endpoint,
       });
       return gatewayClient
         .postToConnection({
           ConnectionId: this.connectionId,
           Data: JSON.stringify(message),
         })
         .promise();

    Instead of manually invoking the API endpoint, we can also use DynamoDB streams to trigger a lambda asynchronously and publish messages to topics.

    Implementing the client

    For testing the socket implementation, we will be using a node.js script ws-client.js. This creates two nodejs ws clients: one that sends the data and another that receives it.

    const WebSocket = require('ws');
    const sockedEndpoint = 'http://0.0.0.0:3001';
    const ws1 = new WebSocket(sockedEndpoint, {
     perMessageDeflate: false
    });
    const ws2 = new WebSocket(sockedEndpoint, {
     perMessageDeflate: false
    });

    The first client on connect sends the data at an interval of one second to a topic named general. The count increments each send.

    ws1.on('open', () => {
       console.log('WS1 connected');
       let count = 0;
       setInterval(() => {
         const data = {
           type: 'message',
           message: `count is ${count}`,
           topic: 'general'
         }
         const message  = JSON.stringify(data);
         ws1.send(message, (err) => {
           if(err) {
             console.log(`Error occurred while send data ${err.message}`)
           }
           console.log(`WS1 OUT ${message}`);
         })
         count++;
       }, 15000)
    })

    The second client on connect will first subscribe to the general topic and then attach a handler for receiving data.

    ws2.on('open', () => {
     console.log('WS2 connected');
     const data = {
       type: 'subscribe',
       topic: 'general'
     }
     ws2.send(JSON.stringify(data), (err) => {
       if(err) {
         console.log(`Error occurred while send data ${err.message}`)
       }
     })
    });
    ws2.on('message', ( message) => {
     console.log(`ws2 IN ${message}`);
    });

    Once the service is running, we should be able to see the following output, where the two clients successfully sharing and receiving the messages with our socket server.

    Conclusion

    With API Gateway WebSocket support and DynamoDB, we’re able to implement persistent socket connections using serverless functions. The implementation can be improved and can be as complex as needed.

    WebSocket is an effective way for full-duplex, real-time communication between a web server and a client. It is widely used for building real-time web applications along with helper libraries that offer better features. Implementing WebSockets requires a persistent connection between two parties. Serverless functions are known for short execution time and non-persistent behavior. However, with the API Gateway support for WebSocket endpoints, it is possible to implement a Serverless service built on AWS Lambda, API Gateway, and DynamoDB.