Category: Type

  • Parallelizing Heavy Read and Write Queries to SQL Datastores using Spark and more!

    The amount of data in our world has been exploding exponentially day by day. Processing and analyzing this Big Data has become key in the current age to make informed, data-driven decisions. Spark is a unified distributed data processing engine used for Big Data. Spark can be used to process Big Data in an efficient manner. Spark lets you process Big Data faster by splitting the work into chunks and assigning those chunks to computation resources across nodes. It can handle up to petabytes of data, which is millions of gigabytes of data. It processes all its data in memory, which makes it faster.

    We talked about processing Big Data in Spark, but we know spark doesn’t store any data like other file systems. So, to process data in Spark, we must read data from different data sources, clean or process the data, and again store this data in one of the target data sources. Data sources can be files, APIs, databases, or streams. 

    Database management systems have been present for a decade. Many applications generate huge amounts of data and store data in database management systems. And a lot of times, we need to connect spark to the database and process that data.

    In this blog, we are going to discuss how to use spark to read from and write to databases in parallel. Our focus will be on reading/writing data from/to the database using different methods, which will help us read/write TeraBytes of data in an efficient manner.

    Reading / Writing data from/to Database using Spark:

    To read data or write data from/to the database, we will need to perform a few basic steps regardless of any programming language or framework we are using. What follows is an overview of the steps to read data from databases.

    Step 1: Register Driver or Use Connector

    Get the respective driver of your database and register the driver, or use the connector to connect to the database.

    Step 2: Make a connection

    Next, the driver or connector makes a connection to the database.

    Step 3: Run query statement

    Using the connection created in the previous step, execute the query, which will return the result.

    Step 4: Process result

    For the result, we got in the previous step, process it as per your requirement.

    Step 5: Close the connection

    Dataset we are using:

    Covid data

    This dataset contains details of COVID patients across all states. It has different information such as State, Confirmed, Recovered, Deceased, Other, Tested, and Date.

    You can load this dataset in any of the databases you work with and can try out the entire discussion practically.

    The following image shows ten records of the entire dataset.

    Single-partition Spark program:

    ## Creating a spark session and adding Postgres Driver to spark.
    from pyspark.sql import SparkSession
    
    ## Creating spark session and adding Postgres Driver to spark.
    spark_session = SparkSession.builder 
        .master("local") 
        .appName("Databases") 
        .config("spark.jars.packages", "org.postgresql:postgresql:42.2.8") 
        .getOrCreate()
    
    hostname = "localhost",
    jdbc_port = 5432,
    dbname = "aniket",
    username = "postgres",
    password = "pass@123"
    
    jdbc_url = "jdbc:postgresql://{0}:{1}/{2}".format(hostname, jdbc_port, dbname)
    
    ## reading data
    table_data_df = spark_session.read 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "aniket") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .load()
    
    ## writing data
    table_data_df.write 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "spark_schema.zipcode_table") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .save()

    Spark provides an API to read data from a database and is very simple to use. First of all, we will need to create a Spark session. Then add the driver to Spark. It can be added through the program itself, or we can add it using shell also.

    The first line of code imports the SparkSession class. This is the entry point to programming Spark with the Dataset and DataFrame API

    From the fifth to the ninth line of the above code, we are creating a spark session on a local system with four cores, which will be used for interaction with our spark application. We specify the name for our application using appName(), which in our case, is ‘Databases.’ This app name will be shown on Webb UI for our cluster. Next, we can specify any configurations for the spark application using config(). In our case, we have specified the configuration of the driver for the Postgres database, which will be used to create a connection with the Postgres database. You can specify the driver of any of the available databases. 

    To connect to the database, we must have a hostname, port, database name, username, and password with us. Those details are in 10 through 16 lines of the above code.

    Refer to the code lines from 19 to 28 in the above snippet. Up until now, we have had our Spark session and all the information that we need to connect to the database. Using the Spark Read API, we read the data from the database. This will create a connection to the Postgres database from one of the cores that we have allocated for the Spark application. And using this connection, it will read the data into the table_data_df dataframe. Even if we have multiple cores for our application, it will still create only one connection from one of the cores. The rest of the cores will not be utilized. While we will discuss how to utilize all cores, our first focus is here.

    Refer to the code lines from 29 to 38 in the above snippet. We have the data now, so let’s try to write it to the database. Using the Spark Write API, we will write data to the database. This will also create only one connection to the database from one of the cores that we have allocated for the Spark application. Even if we have more cores for the application, it still uses only one core with the above code.

    Output of Program:

    /usr/bin/python3.8 /home/aniketrajput/aniket_work/Spark/main.py
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
    WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    :: loading settings :: url = jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
    Ivy Default Cache set to: /home/aniketrajput/.ivy2/cache
    The jars for the packages stored in: /home/aniketrajput/.ivy2/jars
    org.postgresql#postgresql added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8;1.0
    	confs: [default]
    	found org.postgresql#postgresql;42.2.8 in central
    :: resolution report :: resolve 113ms :: artifacts dl 3ms
    	:: modules in use:
    	org.postgresql#postgresql;42.2.8 from central in [default]
    	---------------------------------------------------------------------
    	|                  |            modules            ||   artifacts   |
    	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    	---------------------------------------------------------------------
    	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
    	---------------------------------------------------------------------
    :: retrieving :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8
    	confs: [default]
    	0 artifacts copied, 1 already retrieved (0kB/5ms)
    22/04/22 11:55:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |        state|         district|confirmed|recovered|deceased|other|tested|      date|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |Uttar Pradesh|         Varanasi|    23512|    23010|     456|    0|595510|2021-02-24|
    |  Uttarakhand|           Almora|     3259|     3081|      25|  127| 84443|2021-02-24|
    |  Uttarakhand|        Bageshwar|     1534|     1488|      17|   26| 55626|2021-02-24|
    |  Uttarakhand|          Chamoli|     3486|     3373|      15|   88| 90390|2021-02-24|
    |  Uttarakhand|        Champawat|     1819|     1790|       9|    7| 95068|2021-02-24|
    |  Uttarakhand|         Dehradun|    29619|    28152|     962|  439|401496|2021-02-24|
    |  Uttarakhand|         Haridwar|    14137|    13697|     158|  175|369542|2021-02-24|
    |  Uttarakhand|         Nainital|    12636|    12254|     237|   79|204422|2021-02-24|
    |  Uttarakhand|    Pauri Garhwal|     5145|     5033|      60|   24|138878|2021-02-24|
    |  Uttarakhand|      Pithoragarh|     3361|     3291|      47|   11| 72686|2021-02-24|
    |  Uttarakhand|      Rudraprayag|     2270|     2251|      10|    7| 52378|2021-02-24|
    |  Uttarakhand|    Tehri Garhwal|     4227|     4026|      16|  170|105111|2021-02-24|
    |  Uttarakhand|Udham Singh Nagar|    11538|    11267|     117|  123|337292|2021-02-24|
    |  Uttarakhand|       Uttarkashi|     3789|     3645|      17|  118|120026|2021-02-24|
    |  West Bengal|       Alipurduar|     7705|     7616|      86|    0|  null|2021-02-24|
    |  West Bengal|          Bankura|    11940|    11788|      92|    0|  null|2021-02-24|
    |  West Bengal|          Birbhum|    10035|     9876|      89|    0|  null|2021-02-24|
    |  West Bengal|      Cooch Behar|    11835|    11756|      72|    0|  null|2021-02-24|
    |  West Bengal| Dakshin Dinajpur|     8179|     8099|      74|    0|  null|2021-02-24|
    |  West Bengal|       Darjeeling|    18423|    18155|     203|    0|  null|2021-02-24|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    only showing top 20 rows
    
    
    Process finished with exit code 0

    Multiple Partition spark program:

    from pyspark.sql import SparkSession
    
    ## Creating a spark session and adding Postgres Driver to spark.
    spark_session = SparkSession.builder 
        .master("local[4]") 
        .appName("Databases") 
        .config("spark.jars.packages", "org.postgresql:postgresql:42.2.8")
        .getOrCreate()
    
    hostname = "localhost"
    jdbc_port = 5432
    dbname = "postgres"
    username = "postgres"
    password = "pass@123"
    
    jdbc_url = "jdbc:postgresql://{0}:{1}/{2}".format(hostname, jdbc_port, dbname)
    
    partition_column = 'date'
    lower_bound = '2021-02-20'
    upper_bound = '2021-02-28'
    num_partitions = 4
    
    ## reading data
    table_data_df = spark_session.read 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "covid_data") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .option("partitionColumn", partition_column) 
        .option("lowerBound", lower_bound) 
        .option("upperBound", upper_bound) 
        .option("numPartitions", num_partitions) 
        .load()
    
    table_data_df.show()
    
    ## writing data
    table_data_df.write 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "covid_data_output") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .option("numPartitions", num_partitions) 
        .save()

    As promised in the last section, we will discuss how we can optimize for resource utilization. In the last section, we had only one connection, utilizing very limited resources and causing resources to be idle or unused. To get over this, the Spark Read and Write API has a way by providing a few extra attributes. And those are partitionColumn, lowerBound, upperBound. These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. For each partition, there will be an individual core with its own connection performing the reads or writes. Thus, making the database operation in parallel.

    This is an efficient way of reading and writing data from databases in spark rather than just doing it with one partition. 

    Partitions are decided by the Spark API in the following way.

    Let’s consider an example where:

    lowerBound: 0

    upperBound: 1000

    numPartitions: 10

    Stride is equal to 100, and partitions correspond to the following queries:

    SELECT * FROM table WHERE partitionColumn BETWEEN 0 AND 100 

    SELECT * FROM table WHERE partitionColumn BETWEEN 100 AND 200 

    SELECT * FROM table WHERE partitionColumn > 9000

    BETWEEN here is exclusive on the upper bound.

    Now we have data in multiple partitions. Each executor can have one or more partitions based on cluster configuration. Suppose we have 10 cores and 10 partitions. One partition of data can be fetched from one executor using one core. So, 10 partitions of data can be fetched from 10 executors. Each of these executors will create the connection to the database and will read the data.

    Note– lowerbound and upperbound does not filter the data. It just helps spark to decide the stride of data.

         partitionColumn must be a numeric, date, or timestamp column from the table

    Also, there are some attributes that can be used during the write operation to optimize the write operation. One of the attributes is “batchsize”. The JDBC batch size, which determines how many rows to insert per round trip. This can help the performance of JDBC drivers. This option applies only to writing. One more attribute called “truncate” can be helpful to optimize the write operation. This is a JDBC writer-related option. When SaveMode.Overwrite is enabled, it causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient and prevents the table metadata (e.g., indices) from being removed.

    Output of Program:

    /usr/bin/python3.8 /home/aniketrajput/aniket_work/Spark/main.py
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
    WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    :: loading settings :: url = jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
    Ivy Default Cache set to: /home/aniketrajput/.ivy2/cache
    The jars for the packages stored in: /home/aniketrajput/.ivy2/jars
    org.postgresql#postgresql added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404;1.0
    	confs: [default]
    	found org.postgresql#postgresql;42.2.8 in central
    :: resolution report :: resolve 104ms :: artifacts dl 3ms
    	:: modules in use:
    	org.postgresql#postgresql;42.2.8 from central in [default]
    	---------------------------------------------------------------------
    	|                  |            modules            ||   artifacts   |
    	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    	---------------------------------------------------------------------
    	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
    	---------------------------------------------------------------------
    :: retrieving :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404
    	confs: [default]
    	0 artifacts copied, 1 already retrieved (0kB/4ms)
    22/04/22 12:20:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |        state|         district|confirmed|recovered|deceased|other|tested|      date|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |Uttar Pradesh|         Varanasi|    23512|    23010|     456|    0|595510|2021-02-24|
    |  Uttarakhand|           Almora|     3259|     3081|      25|  127| 84443|2021-02-24|
    |  Uttarakhand|        Bageshwar|     1534|     1488|      17|   26| 55626|2021-02-24|
    |  Uttarakhand|          Chamoli|     3486|     3373|      15|   88| 90390|2021-02-24|
    |  Uttarakhand|        Champawat|     1819|     1790|       9|    7| 95068|2021-02-24|
    |  Uttarakhand|         Dehradun|    29619|    28152|     962|  439|401496|2021-02-24|
    |  Uttarakhand|         Haridwar|    14137|    13697|     158|  175|369542|2021-02-24|
    |  Uttarakhand|         Nainital|    12636|    12254|     237|   79|204422|2021-02-24|
    |  Uttarakhand|    Pauri Garhwal|     5145|     5033|      60|   24|138878|2021-02-24|
    |  Uttarakhand|      Pithoragarh|     3361|     3291|      47|   11| 72686|2021-02-24|
    |  Uttarakhand|      Rudraprayag|     2270|     2251|      10|    7| 52378|2021-02-24|
    |  Uttarakhand|    Tehri Garhwal|     4227|     4026|      16|  170|105111|2021-02-24|
    |  Uttarakhand|Udham Singh Nagar|    11538|    11267|     117|  123|337292|2021-02-24|
    |  Uttarakhand|       Uttarkashi|     3789|     3645|      17|  118|120026|2021-02-24|
    |  West Bengal|       Alipurduar|     7705|     7616|      86|    0|  null|2021-02-24|
    |  West Bengal|          Bankura|    11940|    11788|      92|    0|  null|2021-02-24|
    |  West Bengal|          Birbhum|    10035|     9876|      89|    0|  null|2021-02-24|
    |  West Bengal|      Cooch Behar|    11835|    11756|      72|    0|  null|2021-02-24|
    |  West Bengal| Dakshin Dinajpur|     8179|     8099|      74|    0|  null|2021-02-24|
    |  West Bengal|       Darjeeling|    18423|    18155|     203|    0|  null|2021-02-24|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    only showing top 20 rows
    
    
    Process finished with exit code 0

    We have seen how to read and write data in Spark. Spark is not the only way to connect with databases, right? There are multiple ways we can access databases and try to achieve parallel read-writes. We will discuss this in further sections. We will mainly focus on reading and writing it from python.

    Single Thread Python Program: 

    import traceback
    import psycopg2
    import pandas as pd
    
    class PostgresDbClient:
        def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password):
            self.db_host = postgres_hostname
            self.db_port = postgres_jdbcport
            self.db_name = postgres_dbname
            self.db_user = username
            self.db_pass = password
    
        def create_conn(self):
            conn = None
            try:
                print('Connecting to the Postgres database...')
                conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port))
                print('Successfully connected to the Postgres database...')
            except Exception as e:
                print("Cannot connect to Postgres.")
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
            return conn
    
        def read(self, query):
            try:
                conn = self.create_conn()
                cursor = conn.cursor()
                print(f"Reading data !!!")
                cursor.execute(query)
                data = cursor.fetchall()
                print(f"Read Data !!!")
                cursor.close()
                conn.close()
                return data
            except Exception as e:
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
    
    
    if __name__ == "__main__":
        hostname = "localhost"
        jdbc_port = 5432
        dbname = "postgres"
        username = "postgres"
        password = "pass@123"
        table_name = "covid_data"
        query = f"select * from {table_name}"
    
        db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password)
        data = pd.DataFrame(db_client.read(query))
        print(data)

    To integrate Postgres with Python, we have different libraries or adopters that we can use. But Psycopg is the widely used adopter. First off all, you will need to install the Psycopg2 library. Psycopg2 is a slightly updated version of the Psycopg adapter. You install it using pip or any way you are comfortable with.

    To connect with the Postgres database, we need hostname, port, database name, username, and password. We are storing all these details as attributes in class. The create connection method will form a connection with the Postgres database using the connect() method of psycopg2 module. This method will return a connection object.
    In the read method, we call this connection method and get a connection object. Using this connection object, we create a cursor. This cursor is bound to have a connection with the database for its lifetime and execute all the commands or queries on the database. Using this query object, we execute a read query on the database. Then the data returned by the executing read query can be fetched using the fetchall() method. Then we close the connection.

    To run the program, we have specified details of database and query. Next, we create an object of PostgresDbClient and call the read method from class PostgresDbClient. This read method will return as data and we are converting this data into relational format using pandas.

    This implementation is very straightforward: this program creates one process in our system and fetches all the data using system resources, CPU, memory, etc. The drawback of this approach is that suppose this program uses 30 percent CPU and memory resources out of 100%, then the remaining 70% of resources are idle. We can maximize this usage by other means like multithreading or multiprocessing. 

    Output of Program:

    Connecting to the Postgres database...
    Successfully connected to the Postgres database...
    Reading data !!!
    Read Data !!!
                                  0              1    2   3  4  5     6           7
    0   Andaman and Nicobar Islands        Unknown   33  11  0  0  None  2020-04-26
    1                Andhra Pradesh      Anantapur   53  14  4  0  None  2020-04-26
    2                Andhra Pradesh       Chittoor   73  13  0  0  None  2020-04-26
    3                Andhra Pradesh  East Godavari   39  12  0  0  None  2020-04-26
    4                Andhra Pradesh         Guntur  214  29  8  0  None  2020-04-26
    ..                          ...            ...  ...  .. .. ..   ...         ...
    95                        Bihar         Araria    1   0  0  0  None  2020-04-30
    96                        Bihar          Arwal    4   0  0  0  None  2020-04-30
    97                        Bihar     Aurangabad    8   0  0  0  None  2020-04-30
    98                        Bihar          Banka    3   0  0  0  None  2020-04-30
    99                        Bihar      Begusarai   11   5  0  0  None  2020-04-30
    
    [100 rows x 8 columns]
    
    Process finished with exit code 0

    Multi Thread python program:

    In the previous section, we discussed the drawback of a single process and single-thread implementation. Let’s get started with how to maximize resource usage. Before getting into multithreading, let’s understand a few basic but important concepts.

    What is a process?

    When you execute any program, the operating system loads it in memory and then starts executing the program. This instance of the program being executed is called a process. Computing and memory resources are associated with each process separately.

    What is a thread?

    A thread is a sequential flow of execution. A process is also a thread. Usually, the process is called a main thread. Unlike a process, the same computing and memory resources can be shared with multiple threads.

    What is multithreading?

    This is when a process has multiple threads, along with the main thread, and these threads run independently but concurrently using the same computing and memory resources associated with the process. Such a program is called a multithreaded program or process. Multithreading uses resources very efficiently, which results in maximizing performance.

    What is multiprocessing?

    When multiple processes run independently, with separate resources associated with each process, it is called multiprocessing. Multiprocessing is achieved with multiple processors running separate processes on each processor. 

    Let’s get back to our program. Here you can see we have a connection and read method. These two methods are exactly the same as from the previous section. Here, we have one new function, which is get_thread(). Be careful, as a method belongs to the class, and afunction, it is not part of this class. So, this get_thred() function is global and acts as a wrapper function for calling the read method from the class PostgresDbClient. This is because we can’t create threads using class methods. Don’t get confused if you don’t understand it, as it is just how we write the code.

    To run the program, we have specified the Postgres database details and queries. In the previous approach, we fetched all the data from the table with one thread only. In this approach, the plan is to fetch one day of data using one thread so that we can maximize resource utilization. Here, each query reads one day’s worth of data from the table using one thread. Having 5 queries will fetch 5 days of data, and 5 threads will be running concurrently.

    To create a thread in Python, we will need to use the Thread() method from the threading library. We need to pass the function that we want to run and arguments of that function. The thread() object will create a new thread and return its object. The thread has been created but has not yet started. To start this thread, we will need to use the start() method. In our program, we are starting 5 threads. If you try executing this entire program multiple times, you will end up with different results. Some data will fetch prior, and some will fetch later. And at the time of the next execution, this order will be different again. This is because resource handling is done by the operating system. Depending on what the OS thinks about which thread to give what resources, the output is generated. If you want to know how this is done, you will need to go deep into operating systems concepts.

    In our use case, we are just printing the data to the console. To store the data, there are multiple ways. One simple way is to define the global variable and store the result in it, but we will need to achieve synchronization as multiple threads might access the global variable, which can lead to race conditions. Another way is to extend the thread class to your custom class, and you can define a class variable—and you can use this variable to save the data. Again, here, you will need to make sure you are achieving synchronization.

    So, whenever you want to store the data in a variable by any available method, you will need to achieve synchronization. So, synchronization will lead to the sequential execution of threads. And this sequential processing is not what we are looking for.
    To avoid synchronization, we can directly write the data to the target—so that when the thread reads the data, the same thread will write data again back to the target database. This way, we can avoid synchronization and store the data in the database for future use. This function can look as below, where db_client.write(data) is a function that writes the data to a database.

    def get_thread(thread_id, db_client, query):

        print(f”Starting thread id {thread_id}”)

        data = pd.DataFrame(db_client.read(query))

        print(f”Thread {thread_id} data “, data, sep=”n”)

        db_client.write(data)

        print(f”Stopping thread id {thread_id}”)

    Python Program:

    import threading
    import traceback
    import psycopg2
    import pandas as pd
    
    class PostgresDbClient:
        def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password):
            self.db_host = postgres_hostname
            self.db_port = postgres_jdbcport
            self.db_name = postgres_dbname
            self.db_user = username
            self.db_pass = password
    
        def create_conn(self):
            conn = None
            try:
                print('Connecting to the Postgres database...')
                conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port))
                print('Successfully connected to the Postgres database...')
            except Exception as e:
                print("Cannot connect to Postgres.")
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
            return conn
    
        def read(self, query):
            try:
                conn = self.create_conn()
                cursor = conn.cursor()
                print(f"Reading data !!!")
                cursor.execute(query)
                data = cursor.fetchall()
                print(f"Read Data !!!")
                cursor.close()
                conn.close()
                return data
            except Exception as e:
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
    
    
    def get_thread(thread_id, db_client, query):
        print(f"Starting thread id {thread_id}")
        data = pd.DataFrame(db_client.read(query))
        print(f"Thread {thread_id} data ", data, sep="n")
        print(f"Stopping thread id {thread_id}")
    
    
    if __name__ == "__main__":
        hostname = "localhost"
        jdbc_port = 5432
        dbname = "postgres"
        username = "postgres"
        password = "pass@123"
        table_name = "covid_data"
        query = f"select * from {table_name}"
    
        partition_column = 'date'
        lower_bound = '2020-04-26'
        upper_bound = '2020-04-30'
        num_partitions = 5
    
    
        query1 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query2 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query3 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query4 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query5 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
    
        db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password)
        x1 = threading.Thread(target=get_thread, args=(1, db_client, query1))
        x1.start()
        x2 = threading.Thread(target=get_thread, args=(2, db_client, query2))
        x2.start()
        x3 = threading.Thread(target=get_thread, args=(3, db_client, query3))
        x3.start()
        x4 = threading.Thread(target=get_thread, args=(4, db_client, query4))
        x4.start()
        x5 = threading.Thread(target=get_thread, args=(5, db_client, query5))
        x5.start()

    Output of Program:

    Starting thread id 1
    Connecting to the Postgres database...
    Starting thread id 2
    Connecting to the Postgres database...
    Starting thread id 3
    Connecting to the Postgres database...
    Starting thread id 4
    Connecting to the Postgres database...
    Starting thread id 5
    Connecting to the Postgres database...
    Successfully connected to the Postgres database...Successfully connected to the Postgres database...Successfully connected to the Postgres database...
    Reading data !!!
    
    Reading data !!!
    
    Reading data !!!
    Successfully connected to the Postgres database...
    Reading data !!!
    Successfully connected to the Postgres database...
    Reading data !!!
    Read Data !!!
    Read Data !!!
    Read Data !!!
    Read Data !!!
    Read Data !!!
    Thread 2 data 
    Thread 3 data 
    Thread 1 data 
    Thread 5 data 
    Thread 4 data 
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-27
    1                Andhra Pradesh       Anantapur   53  ...  0  None  2020-04-27
    2                Andhra Pradesh        Chittoor   73  ...  0  None  2020-04-27
    3                Andhra Pradesh   East Godavari   39  ...  0  None  2020-04-27
    4                Andhra Pradesh          Guntur  237  ...  0  None  2020-04-27
    5                Andhra Pradesh         Krishna  210  ...  0  None  2020-04-27
    6                Andhra Pradesh         Kurnool  292  ...  0  None  2020-04-27
    7                Andhra Pradesh        Prakasam   56  ...  0  None  2020-04-27
    8                Andhra Pradesh  S.P.S. Nellore   79  ...  0  None  2020-04-27
    9                Andhra Pradesh      Srikakulam    4  ...  0  None  2020-04-27
    10               Andhra Pradesh   Visakhapatnam   22  ...  0  None  2020-04-27
    11               Andhra Pradesh   West Godavari   54  ...  0  None  2020-04-27
    12               Andhra Pradesh   Y.S.R. Kadapa   58  ...  0  None  2020-04-27
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-27
    14                        Assam         Unknown   36  ...  0  None  2020-04-27
    15                        Bihar           Arwal    4  ...  0  None  2020-04-27
    16                        Bihar      Aurangabad    7  ...  0  None  2020-04-27
    17                        Bihar           Banka    2  ...  0  None  2020-04-27
    18                        Bihar       Begusarai    9  ...  0  None  2020-04-27
    19                        Bihar       Bhagalpur    5  ...  0  None  2020-04-27
    
    [20 rows x 8 columns]
    Stopping thread id 2
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-26
    1                Andhra Pradesh       Anantapur   53  ...  0  None  2020-04-26
    2                Andhra Pradesh        Chittoor   73  ...  0  None  2020-04-26
    3                Andhra Pradesh   East Godavari   39  ...  0  None  2020-04-26
    4                Andhra Pradesh          Guntur  214  ...  0  None  2020-04-26
    5                Andhra Pradesh         Krishna  177  ...  0  None  2020-04-26
    6                Andhra Pradesh         Kurnool  279  ...  0  None  2020-04-26
    7                Andhra Pradesh        Prakasam   56  ...  0  None  2020-04-26
    8                Andhra Pradesh  S.P.S. Nellore   72  ...  0  None  2020-04-26
    9                Andhra Pradesh      Srikakulam    3  ...  0  None  2020-04-26
    10               Andhra Pradesh   Visakhapatnam   22  ...  0  None  2020-04-26
    11               Andhra Pradesh   West Godavari   51  ...  0  None  2020-04-26
    12               Andhra Pradesh   Y.S.R. Kadapa   58  ...  0  None  2020-04-26
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-26
    14                        Assam         Unknown   36  ...  0  None  2020-04-26
    15                        Bihar           Arwal    4  ...  0  None  2020-04-26
    16                        Bihar      Aurangabad    2  ...  0  None  2020-04-26
    17                        Bihar           Banka    2  ...  0  None  2020-04-26
    18                        Bihar       Begusarai    9  ...  0  None  2020-04-26
    19                        Bihar       Bhagalpur    5  ...  0  None  2020-04-26
    
    [20 rows x 8 columns]
    Stopping thread id 1
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-28
    1                Andhra Pradesh       Anantapur   54  ...  0  None  2020-04-28
    2                Andhra Pradesh        Chittoor   74  ...  0  None  2020-04-28
    3                Andhra Pradesh   East Godavari   39  ...  0  None  2020-04-28
    4                Andhra Pradesh          Guntur  254  ...  0  None  2020-04-28
    5                Andhra Pradesh         Krishna  223  ...  0  None  2020-04-28
    6                Andhra Pradesh         Kurnool  332  ...  0  None  2020-04-28
    7                Andhra Pradesh        Prakasam   56  ...  0  None  2020-04-28
    8                Andhra Pradesh  S.P.S. Nellore   82  ...  0  None  2020-04-28
    9                Andhra Pradesh      Srikakulam    4  ...  0  None  2020-04-28
    10               Andhra Pradesh   Visakhapatnam   22  ...  0  None  2020-04-28
    11               Andhra Pradesh   West Godavari   54  ...  0  None  2020-04-28
    12               Andhra Pradesh   Y.S.R. Kadapa   65  ...  0  None  2020-04-28
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-28
    14                        Assam         Unknown   38  ...  0  None  2020-04-28
    15                        Bihar          Araria    1  ...  0  None  2020-04-28
    16                        Bihar           Arwal    4  ...  0  None  2020-04-28
    17                        Bihar      Aurangabad    7  ...  0  None  2020-04-28
    18                        Bihar           Banka    3  ...  0  None  2020-04-28
    19                        Bihar       Begusarai    9  ...  0  None  2020-04-28
    
    [20 rows x 8 columns]
    Stopping thread id 3
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-30
    1                Andhra Pradesh       Anantapur   61  ...  0  None  2020-04-30
    2                Andhra Pradesh        Chittoor   80  ...  0  None  2020-04-30
    3                Andhra Pradesh   East Godavari   42  ...  0  None  2020-04-30
    4                Andhra Pradesh          Guntur  287  ...  0  None  2020-04-30
    5                Andhra Pradesh         Krishna  246  ...  0  None  2020-04-30
    6                Andhra Pradesh         Kurnool  386  ...  0  None  2020-04-30
    7                Andhra Pradesh        Prakasam   60  ...  0  None  2020-04-30
    8                Andhra Pradesh  S.P.S. Nellore   84  ...  0  None  2020-04-30
    9                Andhra Pradesh      Srikakulam    5  ...  0  None  2020-04-30
    10               Andhra Pradesh   Visakhapatnam   23  ...  0  None  2020-04-30
    11               Andhra Pradesh   West Godavari   56  ...  0  None  2020-04-30
    12               Andhra Pradesh   Y.S.R. Kadapa   73  ...  0  None  2020-04-30
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-30
    14                        Assam         Unknown   43  ...  0  None  2020-04-30
    15                        Bihar          Araria    1  ...  0  None  2020-04-30
    16                        Bihar           Arwal    4  ...  0  None  2020-04-30
    17                        Bihar      Aurangabad    8  ...  0  None  2020-04-30
    18                        Bihar           Banka    3  ...  0  None  2020-04-30
    19                        Bihar       Begusarai   11  ...  0  None  2020-04-30
    
    [20 rows x 8 columns]
    Stopping thread id 5
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-29
    1                Andhra Pradesh       Anantapur   58  ...  0  None  2020-04-29
    2                Andhra Pradesh        Chittoor   77  ...  0  None  2020-04-29
    3                Andhra Pradesh   East Godavari   40  ...  0  None  2020-04-29
    4                Andhra Pradesh          Guntur  283  ...  0  None  2020-04-29
    5                Andhra Pradesh         Krishna  236  ...  0  None  2020-04-29
    6                Andhra Pradesh         Kurnool  343  ...  0  None  2020-04-29
    7                Andhra Pradesh        Prakasam   60  ...  0  None  2020-04-29
    8                Andhra Pradesh  S.P.S. Nellore   82  ...  0  None  2020-04-29
    9                Andhra Pradesh      Srikakulam    5  ...  0  None  2020-04-29
    10               Andhra Pradesh   Visakhapatnam   23  ...  0  None  2020-04-29
    11               Andhra Pradesh   West Godavari   56  ...  0  None  2020-04-29
    12               Andhra Pradesh   Y.S.R. Kadapa   69  ...  0  None  2020-04-29
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-29
    14                        Assam         Unknown   38  ...  0  None  2020-04-29
    15                        Bihar          Araria    1  ...  0  None  2020-04-29
    16                        Bihar           Arwal    4  ...  0  None  2020-04-29
    17                        Bihar      Aurangabad    8  ...  0  None  2020-04-29
    18                        Bihar           Banka    3  ...  0  None  2020-04-29
    19                        Bihar       Begusarai   11  ...  0  None  2020-04-29
    
    [20 rows x 8 columns]
    Stopping thread id 4
    
    Process finished with exit code 0

    Note that in this blog, we have used a password as a hardcoded string, which is definitely not the way to define passwords. We should use secrets, .env files, etc., as input for passwords. We do not hardcode passwords in the production environment.

    Conclusion 

    After going through the above blog, you might have gotten more familiar with how to perform read and write operations on databases using spark, python, and multithreading concepts. You also know now what are multi processes and what multithreading is. You are now also able to analyze the best way to carry out read-and-write operations on a database based on your requirements. 

    In general, if you have a small amount of data, you can use a simple python approach to read and write data. If you have a relatively high amount of data, then you can use a multi-threaded approach or a single-partition Spark approach. If you have a huge amount of data, and where reading millions of records per second is a requirement, then you can use the Spark multi-partition approach. In the end, it’s just mostly personal preference, and using which approach depends on your requirements and availability of resources.

  • A Quick Guide to Building a Serverless Chatbot With Amazon Lex

    Amazon announced “Amazon Lex” in December 2016 and since then we’ve been using it to build bots for our customers. Lex is effectively the technology used by Alexa, Amazon’s voice-activated virtual assistant which lets people control things with voice commands such as playing music, setting alarm, ordering groceries, etc. It provides deep learning-powered natural-language understanding along with automatic speech recognition. Amazon now provides it as a service that allows developers to take advantage of the same features used by Amazon Alexa. So, now there is no need to spend time in setting up and managing the infrastructure for your bots.

    Now, developers just need to design conversations according to their requirements in Lex console. The phrases provided by the developer are used to build the natural language model. After publishing the bot, Lex will process the text or voice conversations and execute the code to send responses.

    I’ve put together this quick-start tutorial using which you can start building Lex chat-bots. To understand the terms correctly, let’s consider an e-commerce bot that supports conversations involving the purchase of books.

    Lex-Related Terminologies

    Bot: It consists of all the components related to a conversation, which includes:

    • Intent: Intent represents a goal, needed to be achieved by the bot’s user. In our case, our goal is to purchase books.
    • Utterances: An utterance is a text phrase that invokes intent. If we have more than one intent, we need to provide different utterances for them. Amazon Lex builds a language model based on utterance phrases provided by us, which then invoke the required intent. For our demo example, we need a single intent “OrderBook”. Some sample utterances would be:
    • I want to order some books
    • Can you please order a book for me
    • Slots: Each slot is a piece of data that the user must supply in order to fulfill the intent. For instance, purchasing a book requires bookType and bookName as slots for intent “OrderBook” (I am considering these two factors for making the example simpler, otherwise there are so many other factors based on which one will purchase/select a book.). 
      Slots are an input, a string, date, city, location, boolean, number etc. that are needed to reach the goal of the intent. Each slot has a name, slot type, a prompt, and is it required. The slot types are the valid values a user can respond with, which can be either custom defined or one of the Amazon pre-built types.
    • Prompt: A prompt is a question that Lex uses to ask the user to supply some correct data (for a slot) that is needed to fulfill an intent e.g. Lex will ask  “what type of book you want to buy?” to fill the slot bookType.
    • Fulfillment: Fulfillment provides the business logic that is executed after getting all required slot values, need to achieve the goal. Amazon Lex supports the use of Lambda functions for fulfillment of business logic and for validations.

    Let’s Implement this Bot!

    Now that we are aware of the basic terminology used in Amazon Lex, let’s start building our chat-bot.

    Creating Lex Bot:

    • Go to Amazon Lex console, which is available only in US, East (N. Virginia) region and click on create button.
    • Create a custom bot by providing following information:
    1. Bot Name: PurchaseBook
    2. Output voice: None, this is only a test based application
    3. Set Session Timeout: 5 min
    4. Add Amazon Lex basic role to Bot app: Amazon will create it automatically.  Find out more about Lex roles & permissions here.
    5. Click on Create button, which will redirect you to the editor page.

    Architecting Bot Conversations

    Create Slots: We are creating two slots named bookType and bookName. Slot type values can be chosen from 275 pre-built types provided by Amazon or we can create our own customized slot types.

    Create custom slot type for bookType as shown here and consider predefined type named Amazon.Book for bookName.

    Create Intent: Our bot requires single custom intent named OrderBook.

    Configuring the Intents

    • Utterances: Provide some utterances to invoke the intent. An utterance can consist only of Unicode characters, spaces, and valid punctuation marks. Valid punctuation marks are periods for abbreviations, underscores, apostrophes, and hyphens. If there is a slot placeholder in your utterance ensure, that it’s in the {slotName} format and has spaces at both ends.

    Slots: Map slots with their types and provide prompt questions that need to be asked to get valid value for the slot. Note the sequence, Lex-bot will ask the questions according to priority.

    Confirmation prompt: This is optional. If required you can provide a confirmation message e.g. Are you sure you want to purchase book named {bookName}?, where bookName is a slot placeholder.

    Fulfillment: Now we have all necessary data gathered from the chatbot, it can just be passed over in lambda function, or the parameters can be returned to the client application that then calls a REST endpoint.

    Creating Amazon Lambda Functions

    Amazon Lex supports Lambda function to provide code hooks to the bot. These functions can serve multiple purposes such as improving the user interaction with the bot by using prior knowledge, validating the input data that bot received from the user and fulfilling the intent.

    • Go to AWS Lambda console and choose to Create a Lambda function.
    • Select blueprint as blank function and click next.
    • To configure your Lambda function, provide its name, runtime and code needs to be executed when the function is invoked. The code can also be uploaded in a zip folder instead of providing it as inline code. We are considering Nodejs4.3 as runtime.
    • Click next and choose Create Function.

    We can configure our bot to invoke these lambda functions at two places. We need to do this while configuring the intent as shown below:-

    where, botCodeHook and fulfillment are name of lambda functions we created.

    Lambda initialization and validation  

    Lambda function provided here i.e. botCodeHook will be invoked on each user input whose intent is understood by Amazon Lex. It will validate the bookName with predefined list of books.

    'use strict';
    exports.handler = (event, context, callback) => {
        const sessionAttributes = event.sessionAttributes;
        const slots = event.currentIntent.slots;
        const bookName = slots.bookName;
      
        // predefined list of available books
        const validBooks = ['harry potter', 'twilight', 'wings of fire'];
      
        // negative check: if valid slot value is not obtained, inform lex that user is expected 
        // respond with a slot value 
        if (bookName && !(bookName === "") && validBooks.indexOf(bookName.toLowerCase()) === -1) {
            let response = { sessionAttributes: event.sessionAttributes,
              dialogAction: {
                type: "ElicitSlot",
                 message: {
                   contentType: "PlainText",
                   content: `We do not have book: ${bookName}, Provide any other book name. For. e.g twilight.`
                },
                 intentName: event.currentIntent.name,
                 slots: slots,
                 slotToElicit : "bookName"
              }
            }
            callback(null, response);
        }
      
        // if valid book name is obtained, send command to choose next course of action
        let response = {sessionAttributes: sessionAttributes,
          dialogAction: {
            type: "Delegate",
            slots: event.currentIntent.slots
          }
        }
        callback(null, response);
    };

    Fulfillment code hook

    This lambda function is invoked after receiving all slot data required to fulfill the intent.

    'use strict';
    
    exports.handler = (event, context, callback) => {
        // when intent get fulfilled, inform lex to complete the state
        let response = {sessionAttributes: event.sessionAttributes,
          dialogAction: {
            type: "Close",
            fulfillmentState: "Fulfilled",
            message: {
              contentType: "PlainText",
              content: "Thanks for purchasing book."
            }
          }
        }
        callback(null, response);
    };

    Error Handling: We can customize the error message for our bot users. Click on error handling and replace default values with the required ones. Since the number of retries given is two, we can also provide different message for every retry.

    Your Bot is Now Ready To Chat

    Click on Build to build the chat-bot. Congratulations! Your Lex chat-bot is ready to test. We can test it in the overlay which appears in the Amazon Lex console.

    Sample conversations:

    I hope you have understood the basic terminologies of Amazon Lex along with how to create a simple chat-bot using serverless (Amazon Lambda). This is a really powerful platform to build mature and intelligent chatbots.

  • Getting Started With Golang Channels! Here’s Everything You Need to Know

    We live in a world where speed is important. With cutting-edge technology coming into the telecommunications and software industry, we expect to get things done quickly. We want to develop applications that are fast, can process high volumes of data and requests, and keep the end-user happy. 

    This is great, but of course, it’s easier said than done. That’s why concurrency and parallelism are important in application development. We must process data as fast as possible. Every programming language has its own way of dealing with this, and we will see how Golang does it.  

    Now, many of us choose Golang because of its concurrency, and the inclusion of goroutines and channels has massively impacted the concurrency.

    This blog will cover channels and how they work internally, as well as their key components. To benefit the most from this content, it will help to know a little about goroutines and channels as this blog gets into  the internals of channels. If you don’t know anything, then don’t worry, we’ll be starting off with an introduction to channels, and then we’ll see how they operate.

    What are channels?

    Normally, when we talk about channels, we think of the ones in applications like RabbitMQ, Redis, AWS SQS, and so on. Anyone with no or only a small amount of Golang knowledge would think like this. But Channels in Golang are different from a work queue system. In the work queue system like above, there are TCP connections to the channels, but in Go, the channel is a data structure or  even a design pattern, which we’ll explain later. So, what are the channels in Golang exactly?

    Channels are the medium through which goroutines can communicate with each other. In simple terms, a channel is a pipe that allows a goroutine to either put or read the data. 

    What are goroutines?

    So, a channel is a communication medium for goroutines. Now, let’s give a quick overview of what goroutines are. If you know this already, feel free to skip this section.

    Technically, a goroutine is a function that executes independently in a concurrent fashion. In simple terms, it’s a lightweight thread that’s managed by go runtime. 

    You can create a goroutine by using a Go keyword before a function call.

    Let’s say there’s a function called PrintHello, like this:

    func PrintHello() {
       fmt.Println("Hello")
    }

    You can make this into a goroutine simply by calling this function, as below:

    //create goroutine
     go PrintHello()

    Now, let’s head back to channels, as that’s the important topic of this blog. 

    How to define a channel?

    Let’s see a syntax that will declare a channel. We can do so by using the chan keyword provided by Go.

    You must specify the data type as the channel can handle data of the same data type. 

    //create channel
     var c chan int

    Very simple! But this is not useful since it would create a Nil channel. Let’s print it and see.

    fmt.Println(c)
    fmt.Printf("Type of channel: %T", c)
    <nil>
    Type of channel: chan int

    As you can see, we have just declared the channel, but we can’t transport data through it. So, to create a useful channel, we must use the make function.

    //create channel
    c := make(chan int)
    fmt.Printf("Type of `c`: %T\n", c)
    fmt.Printf("Value of `c` is %v\n", c)
     
    Type of `c`: chan int
    Value of `c` is 0xc000022120

    As you may notice here, the value of c is a memory address. Keep in mind that channels are nothing but pointers. That’s why we can pass them to goroutines, and we can easily put the data or read the data. Now, let’s quickly see how to read and write the data to a channel.

    Read and write operations on a channel:

    Go provides an easy way to read and write data to a channel by using the left arrow.

    c <- 10

    This is a simple syntax to put the value in our created channel. The same syntax is used to define the “send” only type of channels.

    And to get/read the data from channel, we do this:

    <-c

    This is also the way to define the “receive” only type of channels.

    Let’s see a simple program to use the channels.

    func printChannelData(c chan int) {
       fmt.Println("Data in channel is: ", <-c)
    }

    This simple function just prints whatever data is in the channel. Now, let’s see the main function that will push the data into the channel.

    func main() {
       fmt.Println("Main started...")
       //create channel of int
       c := make(chan int)
       // call to goroutine
       go printChannelData(c)
       // put the data in channel
       c <- 10
       fmt.Println("Main ended...")
    }

    This yields to the output:

    Main started...
    Data in channel is:  10
    Main ended...

    Let’s talk about the execution of the program. 

    1. We declared a printChannelData function, which accepts a channel c of data type integer. In this function, we are just reading data from channel c and printing it.

    2. Now, this method will first print “main started…” to the console.

    3. Then, we have created the channel c of data type integer using the make keyword.

    4. We now pass the channel to the function printChannelData, and as we saw earlier, it’s a goroutine. 

    5. At this point, there are two goroutines. One is the main goroutine, and the other is what we have declared. 

    6. Now, we are putting 10 as data in the channel, and at this point, our main goroutine is blocked and waiting for some other goroutine to read the data. The reader, in this case, is the printChannelData goroutine, which was previously blocked because there was no data in the channel. Now that we’ve pushed the data onto the channel, the Go scheduler (more on this later in the blog) now schedules printChannelData goroutine, and it will read and print the value from the channel.

    7. After that, the main goroutine again activates and prints “main ended…” and the program stops. 

    So, what’s happening here? Basically, blocking and unblocking operations are done over goroutines by the Go scheduler. Unless there’s data in a channel you can’t read from it, which is why our printChannelData goroutine was blocked in the first place, the written data has to be read first to resume further operations. This happened in case of our main goroutine.

    With this, let’s see how channels operate internally. 

    Internals of channels:

    Until now, we have seen how to define a goroutine, how to declare a channel, and how to read and write data through a channel with a very simple example. Now, let’s look at how Go handles this blocking and unblocking nature internally. But before that, let’s quickly see the types of channels.

    Types of channels:

    There are two basic types of channels: buffered channels and unbuffered channels. The above example illustrates the behaviour of unbuffered channels. Let’s quickly see the definition of these:

    • Unbuffered channel: This is what we have seen above. A channel that can hold a single piece of data, which has to be consumed before pushing other data. That’s why our main goroutine got blocked when we added data into the channel. 
    • Buffered channel: In a buffered channel, we specify the data capacity of a channel. The syntax is very simple. c := make(chan int,10)  the second argument in the make function is the capacity of a channel. So, we can put up to ten elements in a channel. When the capacity is full, then that channel would get blocked so that the receiver goroutine can start consuming it.

    Properties of a channel:

    A channel does lot of things internally, and it holds some of the properties below:

    • Channels are goroutine-safe.
    • Channels can store and pass values between goroutines.
    • Channels provide FIFO semantics.
    • Channels cause goroutines to block and unblock, which we just learned about. 

    As we see the internals of a channel, you’ll learn about the first three properties.

    Channel Structure:

    As we learned in the definition, a channel is data structure. Now, looking at the properties above, we want a mechanism that handles goroutines in a synchronized manner and with a FIFO semantics. This can be solved using a queue with a lock. So, the channel internally behaves in that fashion. It has a circular queue, a lock, and some other fields. 

    When we do this c := make(chan int,10) Go creates a channel using hchan struct, which has the following fields: 

    type hchan struct {
       qcount   uint           // total data in the queue
       dataqsiz uint           // size of the circular queue
       buf      unsafe.Pointer // points to an array of dataqsiz elements
       elemsize uint16
       closed   uint32
       elemtype *_type // element type
       sendx    uint   // send index
       recvx    uint   // receive index
       recvq    waitq  // list of recv waiters
       sendq    waitq  // list of send waiters
     
       // lock protects all fields in hchan, as well as several
       // fields in sudogs blocked on this channel.
       //
       // Do not change another G's status while holding this lock
       // (in particular, do not ready a G), as this can deadlock
       // with stack shrinking.
       lock mutex
    }

    (Above info taken from Golang.org]

    This is what a channel is internally. Let’s see one-by-one what these fields are. 

    qcount holds the count of items/data in the queue. 

    dataqsize is the size of a circular queue. This is used in case of buffered channels and is the second parameter used in the make function.

    elemsize is the size of a channel with respect to a single element.

    buf is the actual circular queue where the data is stored when we use buffered channels.

    closed indicates whether the channel is closed. The syntax to close the channel is close(<channel_name>). The default value of this field is 0, which is set when the channel gets created, and it’s set to 1 when the channel is closed. 

    sendx and recvx indicates the current index of a buffer or circular queue. As we add the data into the buffered channel, sendx increases, and as we start receiving, recvx increases.

    recvq and sendq are the waiting queue for the blocked goroutines that are trying to either read data from or write data to the channel.

    lock is basically a mutex to lock the channel for each read or write operation as we don’t want goroutines to go into deadlock state.

    These are the important fields of a hchan struct, which comes into the picture when we create a channel. This hchan struct basically resides on a heap and the make function gives us a pointer to that location. There’s another struct known as sudog, which also comes into the picture, but we’ll learn more about that later. Now, let’s see what happens when we write and read the data.

    Read and write operations on a channel:

    We are considering buffered channels in this. When one goroutine, let’s say G1, wants to write the data onto a channel, it does following:

    • Acquire the lock: As we saw before, if we want to modify the channel, or hchan struct, we must acquire a lock. So, G1 in this case, will acquire a lock before writing the data.
    • Perform enqueue operation: We now know that buf is actually a circular queue that holds the data. But before enqueuing the data, goroutine does a memory copy operation on the data and puts the copy into the buffer slot. We will see an example of this.
    • Release the lock: After performing an enqueue operation, it just releases the lock and goes on performing further executions.

    When goroutine, let’s say G2, reads the above data, it performs the same operation, except instead of enqueue, it performs dequeue while also performing the memory copy operation. This states that in channels there’s no shared memory, so the goroutines only share the hchan struct, which is protected by mutex. Others are just copies of memory.

    This satisfies the famous Golang quote:  “Do not communicate by sharing memory instead share memory by communicating.” 

    Now, let’s look at a small example of this memory copy operation.

    func printData(c chan *int) {
       time.Sleep(time.Second * 3)
       data := <-c
       fmt.Println("Data in channel is: ", *data)
    }
     
    func main() {
       fmt.Println("Main started...")
       var a = 10
       b := &a
       //create channel
       c := make(chan *int)
       go printData(c)
       fmt.Println("Value of b before putting into channel", *b)
       c <- b
       a = 20
       fmt.Println("Updated value of a:", a)
       fmt.Println("Updated value of b:", *b)
       time.Sleep(time.Second * 2)
       fmt.Println("Main ended...")
    }

    And the output of this is:

    Main started...
    Value of b before putting into channel 10
    Updated value of a: 20
    Updated value of b: 20
    Data in channel is:  10
    Main ended...

    So, as you can see, we have added the value of variable a into the channel, and we modify that value before the channel can access it. However, the value in the channel stays the same, i.e., 10. Because here, the main goroutine has performed a memory copy operation before putting the value onto the channel. So, even if you change the value later, the value in the channel does not change. 

    Write in case of buffer overflow:

    We’ve seen that the Go routine can add data up to the buffer capacity, but what happens when the buffer capacity is reached? When the buffer has no more space and a goroutine, let’s say G1, wants to write the data, the go scheduler blocks/pauses G1, which will wait until a receive happens from another goroutine, say G2. Now, since we are talking about buffer channels, when G2 consumes all the data, the Go scheduler makes G1 active again and G2 pauses. Remember this scenario, as we’ll use G1 and G2 frequently here onwards.

    We know that goroutine works in a pause and resume fashion, but who controls it? As you might have guessed, the Go scheduler does the magic here. There are few things that the Go scheduler does and those are very important considering the goroutines and channels.

    Go Runtime Scheduler

    You may already know this, but goroutines are user-space threads. Now, the OS can schedule and manage threads, but it’s overhead to the OS, considering the properties that threads carry. 

    That’s why the Go scheduler handles the goroutines, and it basically multiplexes the goroutines on the OS threads. Let’s see how.

    There are scheduling models, like 1:1, N:1, etc., but the Go scheduler uses the M:N scheduling model.

    Basically, this means that there are a number of goroutines and OS threads, and the scheduler basically schedules the M goroutines on N OS threads. For example:

    OS Thread 1:

    OS Thread 2:

    As you can see, there are two OS threads, and the scheduler is running six goroutines by swapping them as needed. The Go scheduler has three structures as below:

    • M: M represents the OS thread, which is entirely managed by the OS, and it’s similar to POSIX thread. M stands for machine.
    • G: G represents the goroutine. Now, a goroutine is a resizable stack that also includes information about scheduling, any channel it’s blocked on, etc.
    • P: P is a context for scheduling. This is like a single thread that runs the Go code to multiplex M goroutines to N OS threads. This is important part, and that’s why P stands for processor.

    Diagrammatically, we can represent the scheduler as:

    (This diagram is referenced from The Go scheduler]

    The P processor basically holds the queue of runnable goroutines—or simply run queues.

    So, anytime the goroutine (G) wants to run it on a OS thread (M), that OS thread first gets hold of P i.e., the context. Now, this behaviour occurs when a goroutine needs to be paused and some other goroutines must run. One such case is a buffered channel. When the buffer is full, we pause the sender goroutine and activate the receiver goroutine.

    Imagine the above scenario: G1 is a sender that tries to send a full buffered channel, and G2 is a receiver goroutine. Now, when G1 wants to send a full channel, it calls into the runtime Go scheduler and signals it as gopark. So, now scheduler, or M, changes the state of G1 from running to waiting, and it will schedule another goroutine from the run queue, say G2.

    This transition diagram might help you better understand:

    As you can see, after the gopark call, G1 is in a waiting state and G2 is running. We haven’t paused the OS thread (M); instead, we’ve blocked the goroutine and scheduled another one. So, we are using maximum throughput of an OS thread. The context switching of goroutine is handled by the scheduler (P), and because of this, it adds complexity to the scheduler. 

    This is great. But how do we resume G1 now because it still wants to add the data/task on a channel, right? So, before G1 sends the gopark signal, it actually sets a state of itself on a hchan struct, i.e., our channel in the sendq field. Remember the sendq and recvq fields? They’re waiting senders and receivers. 

    Now, G1 stores the state of itself as a sudog struct. A sudog is simply a goroutine that is waiting on an element. The sudog struct has these elements:

    type sudog struct{
       g *g
       isSelect bool
       next *sudog
       prev *sudog
       elem unsafe.Pointer //data element
       ...
    }

    g is a waiting goroutine, next and prev are the pointers to sudog/goroutine respectively if there’s any next or previous goroutine present, and elem is the actual element it’s waiting on.

    So, considering our example, G1 is basically waiting to write the data so it will create a state of itself, which we’ll call sudog as below:

    Cool. Now we know, before going into the waiting state, what operations G1 performs. Currently, G2 is in a running state, and it will start consuming the channel data.

    As soon as it receives the first data/task, it will check the waiting goroutine in the sendq attribute of an hchan struct, and it will find that G1 is waiting to push data or a task. Now, here is the interesting thing: G2 will copy that data/task to the buffer, and it will call the scheduler, and the scheduler will put G1 from the waiting state to runnable, and it will add G1 to the run queue and return to G2. This call from G2 is known as goready, and it will happen for G1. Impressive, right? Golang behaves like this because when G1 runs, it doesn’t want to hold onto a lock and push the data/task. That extra overhead is handled by G2. That’s why the sudog has the data/task and the details for the waiting goroutine. So, the state of G1 is like this:

    As you can see, G1 is placed on a run queue. Now we know what’s done by the goroutine and the go scheduler in case of buffered channels. In this example, the sender gorountine came first, but what if the receiver goroutine comes first? What if there’s no data in the channel and the receiver goroutine is executed first? The receiver goroutine (G2) will create a sudog in recvq on the hchan struct. Things are a little twisted when G1 goroutine activates. It will now see whether there are any goroutines waiting in the recvq, and if there is, it will copy the task to the waiting goroutine’s (G2) memory location, i.e., the elem attribute of the sudog. 

    This is incredible! Instead of writing to the buffer, it will write the task/data to the waiting goroutine’s space simply to avoid G2’s overhead when it activates. We know that each goroutine has its own resizable stack, and they never use each other’s space except in case of channels. Until now, we have seen how the send and receive happens in a buffered channel.

    This may have been confusing, so let me give you the summary of the send operation. 

    Summary of a send operation for buffered channels:

    1. Acquire lock on the entire channel or the hchan struct.
    2. Check if there’s any sudog or a waiting goroutine in the recvq. If so, then put the element directly into its stack. We saw this just now with G1 writing to G2’s stack.
    3. If recvq is empty, then check whether the buffer has space. If yes, then do a memory copy of the data. 
    4. If the buffer is full, then create a sudog under sendq of the hchan struct, which will have details, like a currently executing goroutine and the data to put on the channel.

    We have seen all the above steps in detail, but concentrate on the last point. 

    It’s kind of similar to an unbuffered channel. We know that for unbuffered channels, every read must have a write operation first and vice versa.

    So, keep in mind that an unbuffered channel always works like a direct send. So, a summary of a read and write operation in unbuffered channel could be:

    • Sender first: At this point, there’s no receiver, so the sender will create a sudog of itself and the receiver will receive the value from the sudog.
    • Receiver first: The receiver will create a sudog in recvq, and the sender will directly put the data in the receiver’s stack.

    With this, we have covered the basics of channels. We’ve learned how read and write operates in a buffered and unbuffered channel, and we talked about the Go runtime scheduler.

    Conclusion:

    Channels is a very interesting Golang topic. They seem to be difficult to understand, but when you learn the mechanism, they’re very powerful and help you to achieve concurrency in applications. Hopefully, this blog helps your understanding of the fundamental concepts and the operations of channels.

  • Benefits of Using Chatbots: How Companies Are Using Them to Their Advantange

    Bots are the new black! The entire tech industry seems to be buzzing with “bot” fever. Me and my co-founders often see a “bot” company and discuss its business model. Chirag Jog has always been enthusiastic about the bot wave while I have been mostly pessimistic, especially about B2C bots. We should consider that there are many types of “bots” —chat bots, voice bots, AI assistants, robotic process automation(RPA) bots, conversational agents within apps or websites, etc.

    Over the last year, we have been building some interesting chat and voice based bots which has given me some interesting insights. I hope to lay down my thoughts on bots in some detail and with some structure.

    What are bots?

    Bots are software programs which automate tasks that humans would otherwise do themselves. Bots are developed using machine learning software and are expected to aggregate data to make the interface more intelligent and intuitive. There have always been simple rule-based bots which provide a very specific service with low utility. In the last couple of years, we are seeing emergence of intelligent bots that can serve more complex use-cases.

    Why now?

    Machine learning, NLP and AI technologies have matured enabling practical applications where bots can actually do intelligent work >75% of the times. Has general AI been solved? No. But is it good enough to do the simple things well and give hope for more complex things? Yes.

    Secondly, there are billions of DAUs on Whatsapp & Facebook Messenger. There are tens of millions of users on enterprise messaging platforms like Slack, Skype & Microsoft Teams. Startups and enterprises want to use this distribution channel and will continue to experiment aggressively to find relevant use-cases. Millennials are very comfortable using the chat and voice interfaces for a broader variety of use-cases since they used chat services as soon as they came online. As millennials become a growing part of the workforce, the adoption of bots may increase.

    Thirdly, software is becoming more prevalent and more complex. Data is exploding and making sense of this data is getting harder and requiring more skill. Companies are experimenting with bots to provide an “easy to consume” interface to casual users. So non-experts can use the bot interface while experts can use the mobile or web application for the complex workflows. This is mostly true for B2B & enterprise. A good example is how Slack has become the system of engagement for many companies (including at @velotiotech). We require all the software we use (Gitlab, Asana, Jira, Google Docs, Zoho, Marketo, Zendesk, etc.) to provide notifications into Slack. Over time, we expect to start querying the respective Slack bots for information. Only domain experts will log into the actual SaaS applications.

    Types of Bots

    B2C Chat-Bots

    Consumer focused bots use popular messaging and social platforms like Facebook, Telegram, Kik, WeChat, etc. Some examples of consumer bots include weather, e-commerce, travel bookings, personal finance, fitness, news. These are mostly inspired by WeChat which owns the China market and is the default gateway to various internet services. These bots show up as “contacts” in these messenger platforms.

    Strategically, the B2C bots are basically trying to get around the distribution monopoly of Apple & Google Android. As many studies have indicated, getting mobile users to install apps is getting extremely hard. Facebook, Skype, Telegram hope to become the system of engagement and distribution for various apps thereby becoming an alternate “App Store” or “Bot Store”.

    I believe that SMS is a great channel for basic chatbot functionality. Chatbots with SMS interface can be used by all age groups and in remote parts of the world where data infrastructure is lacking. I do expect to see some interesting companies use SMS chatbots to build new business models. Also mobile bots that sniff or integrate with as many of your mobile apps to provide cross-platform and cross-app “intelligence” will succeed — Google Now is a good example.

    An often cited example is the DoNotPay chatbot which helps people contest parking tickets in the UK. In my opinion, the novelty is in the service and it’s efficiency and not in the chatbot interface as such. Also, I have not met anyone who uses a B2C chatbot even on a weekly or monthly basis.

    B2B Bots

    Enterprise bots are available through platforms and interfaces like Slack, Skype, Microsoft Teams, website chat windows, email assistants, etc. They are focused on collaboration, replacing/augmenting emails, information assistants, support, and speeding up decision-making/communications.

    Most of the enterprise bots solve niche and specific problems. This is a great advantage considering the current state of AI/ML technologies. Many of these enterprise bot companies are also able to augment their intelligence with human agents thereby providing better experiences to users.

    Some of the interesting bots and services in the enterprise space include:

    • x.ai and Clara Labs which provide a virtual assistant to help you setup and manage your meetings.
    • Gong.io and Chorus provide a bot that listens in on sales calls and uses voice-to-text and other machine learning algorithms to help your sales teams get better and close more deals.
    • Astro is building an AI assisted email app which will have multiple interfaces including voice (Echo).
    • Twyla is helping to make chatbots on website more intelligent using ML. It integrates with your existing ZenDesk, LivePerson or Salesforce support.
    • Clarke.ai is a bot which uses AI to take notes for your meeting so you can focus better.
    • Smacc provides AI assisted automated book-keeping for SMBs.
    • Slack is one of the fastest growing SaaS companies and has the most popular enterprise bot store. Slack bots are great for pushing and pulling information & data. All SaaS services and apps should have bots that can emit useful updates, charts, data, links, etc to a specific set of users. This is much better than sending emails to an email group. Simple decisions can be taken within a chat interface using something like Slack Buttons. Instead of receiving an email and opening a web page, most people would prefer approving a leave or an expense right within Slack. Slack/Skype/etc will add the ability to embed “cards” or “webviews” or “interactive sections” within chats. This will enable some more complex use-cases to be served via bots. Most enterprise services have Slack bots and are allowing Slack to be a basic system of engagement.
    • Chatbots or even voice-based bots on websites will be a big deal. Imagine that each website has a virtual support rep or a sales rep available to you 24×7 in most popular languages. All business would want such “agents” or “bots” for greater sales conversions and better support.
    • Automation of backoffice tasks can be a HUGE business. KPOs & BPOs are a huge market sp if you can build software or software-enabled processes to reduce costs, then you can build a significant sized company. Some interesting examples here Automation Anywhere and WorkFusion.

    Voice based Bots

    Amazon had a surprise hit in the consumer electronics space with their Amazon Echo device which is a voice-based assistant. Google recently releases their own voice enabled apps to complete with Echo/Alexa. Voice assistants provide weather, music, searches, e-commerce ordering via NLP voice interface. Apple’s Siri should have been leading this market but as usual Apple is following rather leading the market.

    Voice bots have one great advantage- with miniaturization of devices (Apple Watch, Earpods, smaller wearables), the only practical interface is voice. The other option is pairing the device with your mobile phone — which is not a smooth and intuitive process. Echo is already a great device for listening to music with its Spotify integration — just this feature is enough of a reason to buy it for most families.

    Conclusion

    Bots are useful and here to stay. I am not sure about the form or the distribution channel through which bots will become prevalent. In my opinion, bots are an additional interface to intelligence and application workflows. They are not disrupting any process or industry. Consumers will not shop more due to chat or voice interface bots, employees will not collaborate as desired due to bots, information discovery within your company will not improve due to bots. Actually, existing software and SaaS services are getting more intelligent, predictive and prescriptive. So this move towards “intelligent interfaces” is the real disruption.

    So my concluding predictions:

    • B2C chatbots will turn out to be mostly hype and very few practical scalable use-cases will emerge.
    • Voice bots will see increasing adoption due to smaller device sizes. IoT, wearables and music are excellent use-cases for voice based interfaces. Amazon’s Alexa will become the dominant platform for voice controlled apps and devices. Google and Microsoft will invest aggressively to take on Alexa.
    • B2B bots can be intelligent interfaces on software platforms and SaaS products. Or they can be agents that solve very specific vertical use-cases. I am most bullish about these enterprise focused bots which are helping enterprises become more productive or to increase efficiency with intelligent assistants for specific job functions.

    If you’d like to chat about anything related to this article, what tools we use to build bots, or anything else, get in touch.

    PS: Velotio is helping to bring the power of machine learning to enterprise software businesses. Click here to learn more about Velotio.

  • Cleaner, Efficient Code with Hooks and Functional Programming

    React Hooks were introduced in 2018 and ever since numerous POCs have been built around the same. Hooks come in at a time when React has become a norm and class components are becoming increasingly complex. With this blog, I will showcase how Hooks can reduce the size of your code up to 90%. Yes, you heard it right. Exciting, isn’t it? 

    Hooks are a powerful upgrade coming with React 16.8 and utilize the functional programming paradigm. React, however, also acknowledges the volume of class components already built, and therefore, comes with backward compatibility. You can practice by refactoring a small chunk of your codebase to use React Hooks, while not impacting the existing functionality. 

    With this article, I tried to show you how Hooks can help you write cleaner, smaller and more efficient code. 90% Remember!

    First, let’s list out the common problems we all face with React Components as they are today:

    1. Huge Components – caused by the distributed logic in lifecycle Hooks

    2. Wrapper Hell – caused by re-using components

    3. Confusing and hard to understand classes

    In my opinion, these are the symptoms of one big problem i.e. React does not provide a stateful primitive simpler, smaller and more lightweight than class component. That is why solving one problem worsens the other. For example, if we put all of the logic in components to fix Wrapper Hell, it leads to Huge Components, that makes it hard to refactor. On the other hand, if we divide the huge components into smaller reusable pieces, it leads to more nests than in the component tree i.e. Wrapper Hell. In either case, there’s always confusion around the classes.

    Let’s approach these problems one by one and solve them in isolation.

    Huge Components –

    We all have used lifecycle Hooks and often with time they contain more and more stateful logic. It is also observed that stateful logic is shared amongst lifecycle Hooks. For example, consider you have a code that adds an event listener in componentDidMount. The componentDidUpdate method might also contain some logic for setting up the event listeners. Now the cleanup code will be written in componentWillUnmount. See how the logic for the same thing is split between these lifecycle Hooks.

    // Class component
    
    import React from "react";
    
    export default class LazyLoader extends React.Component {
      constructor(props) {
        super(props);
    
        this.state = { data: [] };
      }
    
      loadMore = () => {
        // Load More Data
        console.log("loading data");
      };
    
      handleScroll = () => {
        if (!this.props.isLoading && this.props.isCompleted) {
          this.loadMore();
        }
      };
    
      componentDidMount() {
        this.loadMore();
        document.addEventListener("scroll", this.handleScroll, false);
        // more subscribers and event listeners
      }
    
      componentDidUpdate() {
        //
      }
    
      componentWillUnmount() {
        document.removeEventListener("scroll", this.handleScroll, false);
        // unsubscribe and remove listeners
      }
    
      render() {
        return <div>{this.state.data}</div>;
      }
    }

    React Hooks approach this with useEffect.

    import React, { useEffect, useState } from "react";
    
    export const LazyLoader = ({ isLoading, isCompleted }) => {
      const [data, setData] = useState([]);
    
      const loadMore = () => {
        // Load and setData here
      };
    
      const handleScroll = () => {
        if (!isLoading && isCompleted) {
          loadMore();
        }
      };
    
      // cDM and cWU
      useEffect(() => {
        document.addEventListener("scroll", handleScroll, false);
        // more subscribers and event listeners
    
        return () => {
          document.removeEventListener("scroll", handleScroll, false);
          // unsubscribe and remove listeners
        };
      }, []);
    
      // cDU
      useEffect(() => {
        //
      }, [/** dependencies */]);
    
      return data && <div>{data}</div>;
    };

    Now, let’s move the logic to a custom Hook.

    import { useEffect, useState } from "react";
    
    export function useScroll() {
      const [data, setData] = useState([]);
    
      const loadMore = () => {
        // Load and setData here
      };
    
      const handleScroll = () => {
        if (!isLoading && isCompleted) {
          loadMore();
        }
      };
    
      // cDM and cWU
      useEffect(() => {
        document.addEventListener("scroll", handleScroll, false);
        // more subscribers and event listeners
    
        return () => {
          document.removeEventListener("scroll", handleScroll, false);
          // unsubscribe and remove listeners
        };
      }, []);
    
      return data;
    };

    import React, { useEffect } from "react";
    import { useScroll } from "./useScroll";
    
    const LazyLoader = ({ isLoading, isCompleted }) => {
      const data = useScroll();
    
      // cDU
      useEffect(() => {
        //
      }, [/** dependencies */]);
    
      return data && <div>{data}</div>;
    };

    useEffect puts the code that changes together in one place, making the code more readable and easy to understand. You can also write multiple useEffects. The advantage of this is again to separate out the mutually unrelated code.

    Wrapper Hell –

    If you’re well versed with React, you probably know it doesn’t provide a pattern of attaching a reusable code to the component (like “connect” in react-redux). React solves this problem of data sharing by render props and higher-order components patterns. But using this, requires restructuring of your components, that is hard to follow and, at times, cumbersome. This typically leads to a problem called Wrapper Hell. One can check this by looking at the application in React DevTools. There you can see components wrapped by a number of providers, consumers, HOCs and other abstractions. Because of this, React needed a better way of sharing the logic.

    The below code is inspired from React Conf 2018 – 90% cleaner react w/ Hooks.

    import React from "react";
    import Media from "./components/Media";
    
    function App() {
      return (
        <Media query="(max-width: 480px)">
          {small => (
            <Media query="(min-width: 1024px)">
              {large => (
                <div className="media">
                  <h1>Media</h1>
                  <p>{small ? "small screen" : "not a small screen"}</p>
                  <p>{large ? "large screen" : "not a large screen"}</p>
                </div>
              )}
            </Media>
          )}
        </Media>
      );
    }
    
    export default App;

    import React from "react";
    
    export default class Media extends React.Component {
      removeListener = () => null;
    
      constructor(props) {
        super(props);
        this.state = {
          matches: window.matchMedia(this.props.query).matches
        };
      }
    
      componentDidMount() {
        this.init();
      }
    
      init() {
        const media = window.matchMedia(this.props.query);
        if (media.matches !== this.state.matches) {
          this.setState({ matches: media.matches });
        }
    
        const listener = () => this.setState({ matches: media.matches });
        media.addListener(listener);
        this.removeListener = () => media.removeListener(listener);
      }
    
      componentDidUpdate(prevProps) {
        if (prevProps.query !== this.props.query) {
          this.removeListener();
          this.init();
        }
      }
    
      componentWillUnmount() {
        this.removeListener();
      }
    
      render() {
        return this.props.children(this.state.matches);
      }
    }

    We can check the below example to see how Hooks fix this problem.

    import { useState, useEffect } from "react";
    
    export default function(query) {
      let [matches, setMatches] = useState(window.matchMedia(query).matches);
    
      useEffect(() => {
        let media = window.matchMedia(query);
        if (media.matches !== matches) {
          setMatches(media.matches);
        }
        const listener = () => setMatches(media.matches);
        media.addListener(listener);
        return () => media.removeListener(listener);
      }, [query, matches]);
    
      return matches;
    }

    import React from "react";
    import useMedia from "./hooks/useMedia";
    
    function App() {
      let small = useMedia("(max-width: 480px)");
      let large = useMedia("(min-width: 1024px)");
      return (
        <div className="media">
          <h1>Media</h1>
          <p>{small ? "small screen" : "not a small screen"}</p>
          <p>{large ? "large screen" : "not a large screen"}</p>
        </div>
      );
    }
    
    export default App;

    Hooks provide you with a way to extract a reusable stateful logic from a component without affecting the component hierarchy. This enables it to be tested independently.

    Confusing and hard to understand classes

    Classes pose more problems than it solves. We’ve known React for a very long time and there’s no denying that it is hard for humans as well as for machines. It confuses both of them. Here’s why:

    For Humans –

    1. There’s a fair amount of boilerplate when defining a class.

    2. Beginners and even expert developers find it difficult to bind methods and writing class components.

    3. People often couldn’t decide between functional and class components, as with time they might need state.

    For Machines –

    1. In the minified version of a component file, the method names are not minified and the unused methods are not stripped out, as it’s not possible to tell how all the methods fit together.

    2. Classes make it difficult for React to implement hot loading reliably.

    3. Classes encourage patterns that make it difficult for the compiler to optimize.

    Due to the above problems, classes can be a large barrier in learning React. To keep the React relevant, the community has been experimenting with component folding and Prepack, but the classes make optimizations fall back to the slower path. Hence, the community wanted to present an API that makes it more likely for code to stay on the optimizable path.

    React components have always been closer to functions. And since Hooks introduced stateful logic into functional components, it lets you use more of React’s features without classes. Hooks embrace functions without compromising the practical spirit of React. Hooks don’t require you to learn complex functional and reactive programming techniques.

    Conclusion –

    React Hooks got me excited and I am learning new things every day. Hooks are a way to write far less code for the same usecase. Also, Hooks do not ask the developers who are already busy with shipping, to rewrite everything. You can redo small components with Hooks and slowly move to the complex components later.

    The thinking process in Hooks is meant to be gradual. I hope this blog makes you want to get your hands dirty with Hooks. Do share your thoughts and experiences with Hooks. Finally, I would strongly recommend this official documentation which has great content.

    Recommended Reading: React Today and Tomorrow and 90% cleaner React with Hook

  • Automation Testing with Nightwatch JS and Cucumber: Everything You Need to Know

    What is Nightwatch JS?

    Nightwatch.js is a test automation framework on web applications, developed in Node.js which uses W3C WebDriver API (formerly Selenium WebDriver). It is a complete End-to-End testing solution which aims to simplify writing automated tests and setting up Continuous Integration. Nightwatch works by communicating over a restful HTTP API with a WebDriver server (such as ChromeDriver or Selenium Server). The latest version available in market is 1.0.

    Why Use Nightwatch JS Over Any Other Automation Tool?

    Selenium is demanded for developing automation framework since it supports various programming languages, provides cross-browser testing and also used in both web application and mobile application testing.

    But Nightwatch, built on Node.js, exclusively uses JavaScript as the programming language for end-to-end testing which has the listed advantages –

    • Lightweight framework
    • Robust configuration
    • Integrates with cloud servers like SauceLabs and Browserstack for web and mobile testing with JavaScript, Appium
    • Allows configuration with Cucumber to build a strong BDD (Behaviour Driven Development) setup
    • High performance of the automation execution
    • Improves test structuring
    • Minimum usage and less Maintenance of code

    Installation and Configuration of Nightwatch Framework

    For configuring Nightwatch framework, all needed are the following in your system –

    • Download latest Node.js
    • Install npm
    $ npm install

    • Package.json file for the test settings and dependencies
    $ npm init

    • Install nightwatch and save as dev dependency
    $ npm install nightwatch --save-dev

    • Install chromedriver/geckodriver and save as dev dependency for running the execution on the required browser
    $ npm install chromedriver --save-dev

    {
      "name": "demotest",
      "version": "1.0.0",
      "description": "Demo Practice",
      "main": "index.js",
      "scripts": {
        "test": "nightwatch"
      },
      "author": "",
      "license": "ISC",
      "devDependencies": {
        "chromedriver": "^74.0.0",
        "nightwatch": "^1.0.19"
      }
    }

    • Create a nightwatch.conf.js for webdriver and test settings with nightwatch
    const chromedriver = require('chromedriver');
    
    module.exports = {
      src_folders : ["tests"], //tests is a folder in workspace which has the step definitions
      test_settings: {
        default: {
          webdriver: {
            start_process: true,
            server_path: chromedriver.path,
            port: 4444,
            cli_args: ['--port=4444']
          },
          desiredCapabilities: {
            browserName: 'chrome'
          }
        }
      }
    };

    Using Nightwatch – Writing and Running Tests

    We create a JavaScript file named demo.js for running a test through nightwatch with the command

    $ npm test

    //demo.js is a JS file under tests folder
    module.exports = {
        'step one: navigate to google' : function (browser) { //step one
          browser
            .url('https://www.google.com')
            .waitForElementVisible('body', 1000)
            .setValue('input[type=text]', 'nightwatch')
            .waitForElementVisible('input[name=btnK]', 1000)
        },
      
        'step two: click input' : function (browser) { //step two
          browser
            .click('input[name=btnK]')
            .pause(1000)
            .assert.containsText('#main', 'Night Watch')
            .end(); //to close the browser session after all the steps
        }

    This command on running picks the value “nightwatch” from “test” key in package.json file which hits the nightwatch api to trigger the URL in chromedriver.

    There can be one or more steps in demo.js(step definition js) file as per requirement or test cases.

    Also, it is a good practice to maintain a separate .js file for page objects which consists of the locator strategy and selectors of the UI web elements.

    module.exports = {
        elements: {
          googleInputBox: '//input[@type="text"]',
          searchButton: '(//input[@value="Google Search"])[2]',
          headingText: `//h3[contains(text(),'Nightwatch.js')]`
        }
    }

    The locator strategy is set to CSS and Xpath to inspect the UI elements.

    locateStrategy: async function (selector) { return await selector.startsWith('/') ? 'xpath' : 'css selector'; }

    Nightwatch.conf.js file is also updated with the page_objects location.

    const chromedriver = require('chromedriver');
    
    module.exports = {
      src_folders : ["tests"], //tests is a folder in workspace which has the step definitions
      page_objects_path: 'page_objects/', //page_objects folder where selectors are saved
      test_settings: {
        default: {
          webdriver: {
            start_process: true,
            server_path: chromedriver.path,
            port: 4444,
            cli_args: ['--port=4444']
          },
          desiredCapabilities: {
            browserName: 'chrome'
          }
        }
      }
    };

    Nightwatch and Cucumber JS

    Cucumber is a tool that supports Behavior Driven Development (BDD) and allows to write tests in simple english language in Given, When, Then format.

    • It is helpful to involve business stakeholders who can’t easily read code
    • Cucumber testing focuses on covering the UI scenarios from end-user’s perspective
    • Reuse of code is easily possible
    • Quick set up and execution
    • Efficient tool for UI testing

    We add cucumber as dev dependency in the code.

    $ npm install --save-dev nightwatch-api nightwatch cucumber chromedriver cucumber-pretty

    {
      "name": "nightwatchdemo",
      "version": "1.0.0",
      "description": "To learn automation by nightwatch",
      "main": "google.js",
      "scripts": {
        "test": "nightwatch",
        "test:cucumber": "cucumber-js --require cucumber.conf.js --require tests --format node_modules/cucumber-pretty"
      },
      "author": "",
      "license": "ISC",
      "dependencies": {
        "cucumber": "^5.1.0",
        "cucumber-pretty": "^1.5.0"
      },
      "devDependencies": {
        "chromedriver": "^2.40.0",
        "nightwatch": "^1.0.19",
        "nightwatch-api": "^2.2.0"
      }
    }

    Cucumber can be configured in the nightwatch framework to help maintaining the test scenarios in its .feature files. We create a file cucumber.conf.js in the root folder which has the setup of starting, creating and closing webdriver sessions.

    const { setDefaultTimeout, AfterAll, BeforeAll } = require('cucumber');
    const { createSession, closeSession, startWebDriver, stopWebDriver } = require('nightwatch-api');
    
    setDefaultTimeout(60000);
    
    BeforeAll(async () => {
      await startWebDriver();
      await createSession();
    });
    
    AfterAll(async () => {
      await closeSession();
      await stopWebDriver();
    });

    Then we create a feature file which has the test scenarios in Given, When, Then format.  

    Feature: Google Search
    
    Scenario: Searching Google
    
      Given I open Google's search page
      Then the title is "Google"
      And the Google search form exists

    For Cucumber to be able to understand and execute the feature file we need to create matching step definitions for every feature step we use in our feature file. Create a step definition file under tests folder called google.js. Step definitions which uses Nightwatch client should return the result of api call as it returns a Promise. For example,

    Given(/^I open Google's search page$/, () => { 
    return client 
    .url('http://google.com') 
    .waitForElementVisible('body', 1000);
    });

    OR

    Given(/^I open Google's search page$/, async () => {
    await client
    .url('http://google.com')
    .waitForElementVisible('body', 1000);
    });

    const { client } = require('nightwatch-api');
    const { Given, Then, When } = require('cucumber');
    
    Given(/^I open Google's search page$/, () => {
      return client.url('http://google.com').waitForElementVisible('body', 1000);
    });
    
    Then(/^the title is "([^"]*)"$/, title => {
      return client.assert.title(title);
    });
    
    Then(/^the Google search form exists$/, () => {
      return client.assert.visible('input[name="q"]');
    });

    $ npm run e2e-test

    Executing Individual Feature Files or Scenarios

    • Single feature file
    npm run e2e-test -- features/file1.feature

    • Multiple feature files
    npm run e2e-test -- features/file1.feature features/file2.feature

    • Scenario by its line number
    npm run e2e-test -- features/my_feature.feature:3

    • Feature directory
    npm run e2e-test -- features/dir

    • Scenario by its name matching a regular expression
    npm run e2e-test -- --name "topic 1"

    Feature and Scenario Tags

    Cucumber allows to add tags to features or scenarios and we can selectively run a scenario using those tags. The tags can be used with conditional operators also, depending on the requirement.  

    • Single tag
    # google.feature
    @google
    Feature: Google Search
    @search
    Scenario: Searching Google 
    Given I open Google's search page 
    Then the title is "Google" 
    And the Google search form exists

    npm run e2e-test -- --tags @google

    • Multiple tags
    npm run e2e-test -- --tags "@google or @duckduckgo"
    
    npm run e2e-test -- --tags "(@google or @duckduckgo) and @search"

    • To skip tags
    npm run e2e-test -- --tags "not @google"
    
    npm run e2e-test -- --tags "not(@google or @duckduckgo)"

    Custom Reporters in Nightwatch and Cucumber Framework

    Reporting is again an advantage provided by Cucumber which generates a report of test results at the end of the execution and it provides an immediate visual clue of a possible problem and will simplify the debugging process. HTML reports are best suited and easy to understand due to its format. To generate the same, we will add cucumber-html-reporter as a dependency in our nightwatch.conf.js file.

    $ npm install --save-dev cucumber-html-reporter mkdirp

    Cucumber-html-reporter in node_modules manages the creation of reports and generates in the output location after the test execution. Screenshot feature can enabled by adding the below code snippet in nightwatch.conf.js

    module.exports = { 
    test_settings: { 
    default: { 
    screenshots: { 
    enabled: true, 
    path: 'screenshots'
    } }  } };

    The Cucumber configuration file can be extended with the handling of screenshots and attaching them to the report. Now – It also enables generating HTML test report at the end of the execution. It is generated based on Cucumber built-can be configured here in JSON report using different templates. We use a setTimeout() block in our cucumber.conf.js to run the generation after Cucumber finishes with the creation of json report.

    const fs = require('fs');
    const path = require('path');
    const { setDefaultTimeout, After, AfterAll, BeforeAll } = require('cucumber');
    const { createSession, closeSession, startWebDriver, stopWebDriver } = require('nightwatch-api');
    const reporter = require('cucumber-html-reporter');
    
    const attachedScreenshots = getScreenshots();
    
    function getScreenshots() {
      try {
        const folder = path.resolve(__dirname, 'screenshots');
    
        const screenshots = fs.readdirSync(folder).map(file => path.resolve(folder, file));
        return screenshots;
      } catch (err) {
        return [];
      }
    }
    
    setDefaultTimeout(60000);
    
    BeforeAll(async () => {
      await startWebDriver({ env: process.env.NIGHTWATCH_ENV || 'chromeHeadless' });
      await createSession();
    });
    
    AfterAll(async () => {
      await closeSession();
      await stopWebDriver();
      setTimeout(() => {
        reporter.generate({
          theme: 'bootstrap',
          jsonFile: 'report/cucumber_report.json',
          output: 'report/cucumber_report.html',
          reportSuiteAsScenarios: true,
          launchReport: true,
          metadata: {
            'App Version': '0.3.2',
            'Test Environment': 'POC'
          }
        });
      }, 0);
    });
    
    After(function() {
      return Promise.all(
        getScreenshots()
          .filter(file => !attachedScreenshots.includes(file))
          .map(file => {
            attachedScreenshots.push(file);
            return this.attach(fs.readFileSync(file), 'image/png');
          })
      );
    });

    In package.json file, we have added the JSON formatter to create a JSON report and it is used by cucumber-html-reporter for the same. We use mkdirp to make sure report folder exists before running the test.

    "scripts": { 
    "e2e-test": "mkdirp report && cucumber-js --require cucumber.conf.js --require step-definitions --format node_modules/cucumber-pretty --format json:report/cucumber_report.json" 
    }

    After adding this, run the command again

    npm run e2e-test

    When the test run completes, the HTML report is displayed in a new browser tab in the format given below

    Conclusion

    Nightwatch-Cucumber is a great module for linking the accessibility of Cucumber.js with the robust testing framework of Nightwatch.js. Together they can not only provide easily readable documentation of test suite, but also highly configurable automated user tests, all while keeping everything in JavaScript.

  • How To Get Started With Logging On Kubernetes?

    In distributed systems like Kubernetes, logging is critical for monitoring and providing observability and insight into an application’s operations. With the ever-increasing complexity of distributed systems and the proliferation of cloud-native solutions, monitoring and observability have become critical components in knowing how the systems are functioning.

    Logs don’t lie! They have been one of our greatest companions when investigating a production incident.

    How is logging in Kubernetes different?

    Log aggregation in Kubernetes differs greatly from logging on traditional servers or virtual machines, owing to the way it manages its applications (pods).

    When an app crashes on a virtual machine, its logs remain accessible until they are deleted. When pods are evicted, crashed, deleted, or scheduled on a different node in Kubernetes, the container logs are lost. The system is self-cleaning. As a result, you are left with no knowledge of why the anomaly occurred. Because default logging in Kubernetes is transient, a centralized log management solution is essential.

    Kubernetes is highly distributed and dynamic in nature; hence, in production, you’ll most certainly be working with multiple machines that have multiple containers each, which can crash at any time. Kubernetes clusters add to the complexity by introducing new layers that must be monitored, each of which generates its own type of log.

    We’ve curated some of the best tools to help you achieve this, alongside a simple guide on how to get started with each of them, as well as a comparison of these tools to match your use case.

    PLG Stack

    Introduction:

    Promtail is an agent that ships the logs from the local system to the Loki cluster.

    Loki is a horizontally scalable, highly available, multi-tenant log aggregation system inspired by Prometheus. It indexes only metadata and doesn’t index the content of the log. This design decision makes it very cost-effective and easy to operate.

    Grafana is the visualisation tool which consumes data from Loki data source

    Loki is like Prometheus, but for logs: we prefer a multidimensional label-based approach to indexing and want a single-binary, easy to operate a system with no dependencies. Loki differs from Prometheus by focusing on logs instead of metrics, and delivering logs via push, instead of pull.

    Configuration Options:

    Installation with Helm chart –

    # Create a namespace to deploy PLG stack :
    
    kubectl create ns loki
    
    # Add Grafana's Helm Chart repository and Update repo :
    
    helm repo add grafana https://grafana.github.io/helm-charts
    helm repo update
    
    # Deploy the Loki stack :
    
    helm upgrade --install loki-stack grafana/loki-stack -n loki --set grafana.enabled=true
    
    # Retrieve password to log into Grafana with user admin
    
    kubectl get secret loki-stack-grafan -n loki -o jsonpath="{.data.admin-password}" | base64 --decode ; echo
    
    # Finally execute command below to access the Grafana UI on http://localhost:3000
    
    kubectl port-forward -n loki service/loki-stack-grafana 3000:80

    Log in with user name “admin” and the password you retrieved earlier.

    Query Methods:

    Using CLI :

    Curl command to fetch logs directly from Loki

    curl -G -s "http://localhost:3100/loki/api/v1/query" 
    --data-urlencode 'query={job="shruti/logging-golang"}' | jq

    Using LogQL :

    • LogQL provides the functionality to filter logs through operators.

    For example :

    {container="kube-apiserver"} |= "error" != "timeout"

    • LogCLI is the command-line interface to Grafana Loki. It facilitates running LogQL queries against a Loki instance.

    For example :

    logcli query '{job="shruti/logging-golang"}'

    Using Dashboard :

    Click on Explore tab on the left side. Select Loki from the data source dropdown

    EFK Stack

    Introduction :

    The Elastic Stack contains most of the tools required for log management

    • Elastic search is an open source, distributed, RESTful and scalable search engine. It is a NoSQL database, primarily to store logs and retrive logs from Fluentd.
    • Log shippers such as LogStash, Fluentd , Fluent-bit. It is an open source log collection agent which support multiple data sources and output formats. It can forward logs to solutions like Stackdriver, CloudWatch, Splunk, Bigquery, etc.
    • Kibana as the UI tool for querying, data visualisation and dashboards. It has ability to virtually  build any type of dashboards using Kibana. Kibana Query Language (KQL) is used for querying elasticsearch data.
    • Fluentd ➖Deployed as daemonset as it need to collect the container logs from all the nodes. It connects to the Elasticsearch service endpoint to forward the logs.
    • ElasticSearch ➖ Deployed as statefulset as it holds the log data. A service endpoint is also exposed for Fluentd and Kibana to connect with it.
    • Kibana ➖ Deployed as deployment and connects to elasticsearch service endpoint.

    Configuration Options :

    Can be installed through helm chart as a Stack or as Individual components

    • Add the Elastic Helm charts repo:
     helm repo add elastic https://helm.elastic.co && helm repo update

    • More information related to deploying these Helm Charts can be found here

    After installation is complete and Kibana Dashboard is accessible, We need to define index patterns to be able to see logs in Kibana.

    From homepage, write Kibana / Index Patterns to search bar. Go to Index patterns page and click to Create index pattern on the right corner. You will see the list of index patterns here.

    Add required patterns to your indices and From left side menu, click to discover and check your logs 🙂

    Query Methods :

    • Elastic Search can be queried directly on any indices.
    • For Example –
    curl -XGET 'localhost:9200/my_index/my_type/_count?q=field:value&pretty'

    More information can be found here  https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html

    Using Dashboard :

    Graylog Stack

    Introduction

    Graylog is a leading centralised log management solution built to open standards for capturing, storing, and enabling real-time analysis of terabytes of machine. it supports the Master-Slave Architecture. The Graylog Stack — Graylog v3, Elasticsearch v6 along with MongoDB v3.

    Graylog is an open-source log management tool, using Elasticsearch as its storage. Unlike the ELK stack, which is built from individual components (Elasticsearch, Logstash, Kibana), Graylog is built as a complete package that can do everything.

    • One package with all the essentials of log processing: collect, parse, buffer, index, search, analyze
    • Additional features that you don’t get with the open-source ELK stack, such as role-based access control and alerts
    • Fits the needs of most centralized log management use-cases in one package
    • Easily scale both the storage (Elasticsearch) and the ingestion pipeline
    • Graylog’s extractors allow to extract fields out of log messages using a lot of methods such grok expression, regex and json

    Cons :

    • Visualization capabilities are limited, at least compared to ELK’s Kibana
    • Can’t use the whole ELK ecosystem, because they wouldn’t directly access the Elasticsearch API. Instead, Graylog has its own API
    • It is Not implemented for kubernetes distribution directly rather supports logging via fluent-bit/logstash/fluentd

    Configurations Options

    Graylog is very flexible in such a way that it supports multiple inputs (data sources ) we can mention :

    • GELF TCP.
    • GELF Kafka.
    • AWS Logs.

    as well as Outputs (how can Graylog nodes forward messages) — we can mention :

    • GELF Output.
    • STDOUT.- query via http / rest api

    Connecting External GrayLog Stack:

    • Host & IP (12201) TCP input to push logs to graylog stack directly

    Query Methods

    Using CLI:

    curl -u admin:password -H 'X-Requested-By: cli' "http://GRAYLOG_IP_OR_HOSTNAME/api/search/universal/relative?query=*&range=3600&limit=100&sort=timestamp:desc&pretty=true" -H "Accept: application/json" -H "Content-Type: application/json"

    Where:

    • query=* – replace * with your desired string
    • range=3600 – replace 3600 with time range (in seconds)
    • limit=100 – replace 100 with number of returned results
    • sort=timestamp:desc – replace timestamp:desc with field you want to sort

    Using Dashboard:

    One can easily navigate the filter section and perform search with the help of labels generated by log collectors.

    Splunk Stack

    Introduction

    Splunk is used for monitoring and searching through big data. It indexes and correlates information in a container that makes it searchable, and makes it possible to generate alerts, reports and visualisations.

    Configuration Options

    1. Helm based Installation as well as Operator based Installation is supported

    2. Splunk Connect for Kubernetes provides a way to import and search your Kubernetes logging, object, and metrics data in your Splunk platform deployment. Splunk Connect for Kubernetes supports importing and searching your container logs on ECS, EKS, AKS, GKE and Openshift

    3. Splunk Connect for Kubernetes supports installation using Helm.

    4. Splunk Connect for Kubernetes deploys a DaemonSet on each node. And in the DaemonSet, a Fluentd container runs and does the collecting job. Splunk Connector for Kubernetes collects three types of data – Logs, Objects and Metrics

    5. We need a minimum of two Splunk platform indexes

    One events index, which will handle logs and objects (you may also create two separate indexes for logs and objects).

    One metrics index. If you do not configure these indexes, Kubernetes Connect for Splunk uses the defaults created in your HTTP Event Collector (HEC) token.

    6. An HEC token will be required, before moving on to installation

    7. To install and configure defaults with Helm :

    Add Splunk chart repo

    helm repo add splunk <https://splunk.github.io/splunk-connect-for-kubernetes/>

    Get values file in your working directory and prepare this Values file.

    helm show values splunk/splunk-connect-for-kubernetes > values.yaml

    Once you have a Values file, you can simply install the chart with by running

    helm install my-splunk-connect -f values.yaml splunk/splunk-connect-for-kubernetes

    To learn more about using and modifying charts, see: https://github.com/splunk/splunk-connect-for-kubernetes/tree/main/helm-chart

    The values file for logging

    Query Methods

    Using CLI :

    curl --location -k --request GET '<https://localhost:8089/services/search/jobs/export?search=search%20index=%22event%22%20sourcetype=%22kube:container:docker-log-generator%22&output_mode=json>' -u admin:Admin123!

    Using Dashboard :

    Logging Stack

    Comparison of Tools

    Some of the other tools that are interesting but aren’t open source—but are too good not to talk about and offer end-to-end functionality for all your logging needs:

    Sumo Logic :

    This log management tool can store logs as well as metrics. It has a powerful search syntax, where you can define operations similarly to UNIX pipes.

    • Powerful query language
    • Capability to detect common log patterns and trends
    • Centralized management of agents
    • Supports Log Archival & Retention
    • Ability to perform Audit Trails and Compliance Tracking

    Configuration Options :

    • A subscription to Sumo Logic will be required
    • Helm installation
    • Provides options to install side-by-side existing Prometheus Operator

    More information can be found here!

    Cons :

    • Performance can be bad for searches over large data sets or long timeframes.
    • Deployment only available on Cloud, SaaS, and Web-Based
    • Expensive – Pricing is per ingested byte, so it forces you to pick and choose what you log, rather than ingesting everything and figuring it out later

    Datadog:

    Datadog is a SaaS that started up as a monitoring (APM) tool and later added log management capabilities as well.

    You can send logs via HTTP(S) or syslog, either via existing log shippers (rsyslog, syslog-ng, Logstash, etc.) or through Datadog’s own agent. With it, observe your logs in real-time using the Live Tail, without indexing them. You can also ingest all of the logs from your applications and infrastructure, decide what to index dynamically with filters, and then store them in an archive.

    It features Logging without Limits™, which is a double-edged sword: it’s harder to predict and manage costs, but you get pay-as-you-use pricing combined with the fact that you can archive and restore from archive

    • Log processing pipelines have the ability to process millions of logs per minute or petabytes per month seamlessly.
    • Automatically detects common log patterns
    • Can archive logs to AWS/Azure/Google Cloud storage and rehydrate them later
    • Easy search with good autocomplete (based on facets)
    • Integration with Datadog metrics and traces
    • Affordable, especially for short retention and/or if you rely on the archive for a few searches going back

    Configuration options :

    Using CLI :

    curl -X GET "<https://api.datadoghq.com/api/v2/logs/events>" -H "Content-Type: application/json" -H "DD-API-KEY: {DD_API_KEY}" -H "DD-APPLICATION-KEY: ${DD_APP_KEY}"

    Cons :

    • Not available on premises
    • It is a bit complicated to set up for the first time. Is not quite easy to use or know at first about all the available features that Datadog has. The interface is tricky and can be a hindrance sometimes. Following that, if application fields are not mapped in the right way, filters are not that useful.
    • Datadog per host pricing can be very expensive.

    Conclusion :

    As one can see, each software has its own benefits and downsides. Grafana’s Loki is more lightweight than Elastic Stack in overall performance, supporting Persistent Storage Options.

    That being said, the right solution platform really depends on each administrator’s needs.

    That’s all! Thank you.

    If you enjoyed this article, please like it.

    Feel free to drop a comment too.

  • Demystifying High Availability in Kubernetes Using Kubeadm

    Introduction

    The rise of containers has reshaped the way we develop, deploy and maintain the software. Containers allow us to package the different services that constitute an application into separate containers, and to deploy those containers across a set of virtual and physical machines. This gives rise to container orchestration tool to automate the deployment, management, scaling and availability of a container-based application. Kubernetes allows deployment and management of container-based applications at scale. Learn more about backup and disaster recovery for your Kubernetes clusters.

    One of the main advantages of Kubernetes is how it brings greater reliability and stability to the container-based distributed application, through the use of dynamic scheduling of containers. But, how do you make sure Kubernetes itself stays up when a component or its master node goes down?
     

     

    Why we need Kubernetes High Availability?

    Kubernetes High-Availability is about setting up Kubernetes, along with its supporting components in a way that there is no single point of failure. A single master cluster can easily fail, while a multi-master cluster uses multiple master nodes, each of which has access to same worker nodes. In a single master cluster the important component like API server, controller manager lies only on the single master node and if it fails you cannot create more services, pods etc. However, in case of Kubernetes HA environment, these important components are replicated on multiple masters(usually three masters) and if any of the masters fail, the other masters keep the cluster up and running.

    Advantages of multi-master

    In the Kubernetes cluster, the master node manages the etcd database, API server, controller manager, and scheduler, along with all the worker nodes. What if we have only a single master node and if that node fails, all the worker nodes will be unscheduled and the cluster will be lost.

    In a multi-master setup, by contrast, a multi-master provides high availability for a single cluster by running multiple apiserver, etcd, controller-manager, and schedulers. This does not only provides redundancy but also improves network performance because all the masters are dividing the load among themselves.

    A multi-master setup protects against a wide range of failure modes, from a loss of a single worker node to the failure of the master node’s etcd service. By providing redundancy, a multi-master cluster serves as a highly available system for your end-users.

    Steps to Achieve Kubernetes HA

    Before moving to steps to achieve high-availability, let us understand what we are trying to achieve through a diagram:

    (Image Source: Kubernetes Official Documentation)

    Master Node: Each master node in a multi-master environment run its’ own copy of Kube API server. This can be used for load balancing among the master nodes. Master node also runs its copy of the etcd database, which stores all the data of cluster. In addition to API server and etcd database, the master node also runs k8s controller manager, which handles replication and scheduler, which schedules pods to nodes.

    Worker Node: Like single master in the multi-master cluster also the worker runs their own component mainly orchestrating pods in the Kubernetes cluster. We need 3 machines which satisfy the Kubernetes master requirement and 3 machines which satisfy the Kubernetes worker requirement.

    For each master, that has been provisioned, follow the installation guide to install kubeadm and its dependencies. In this blog we will use k8s 1.10.4 to implement HA.

    Note: Please note that cgroup driver for docker and kubelet differs in some version of k8s, make sure you change cgroup driver to cgroupfs for docker and kubelet. If cgroup driver for kubelet and docker differs then the master doesn’t come up when rebooted.

    Setup etcd cluster

    1. Install cfssl and cfssljson

    $ curl -o /usr/local/bin/cfssl https://pkg.cfssl.org/R1.2/cfssl_linux-amd64 
    $ curl -o /usr/local/bin/cfssljson https://pkg.cfssl.org/R1.2/cfssljson_linux-amd64
    $ chmod +x /usr/local/bin/cfssl*
    $ export PATH=$PATH:/usr/local/bin

    2 . Generate certificates on master-0

    $ mkdir -p /etc/kubernetes/pki/etcd
    $ cd /etc/kubernetes/pki/etcd

    3. Create config.json file in /etc/kubernetes/pki/etcd folder with following content.

    {
        "signing": {
            "default": {
                "expiry": "43800h"
            },
            "profiles": {
                "server": {
                    "expiry": "43800h",
                    "usages": [
                        "signing",
                        "key encipherment",
                        "server auth",
                        "client auth"
                    ]
                },
                "client": {
                    "expiry": "43800h",
                    "usages": [
                        "signing",
                        "key encipherment",
                        "client auth"
                    ]
                },
                "peer": {
                    "expiry": "43800h",
                    "usages": [
                        "signing",
                        "key encipherment",
                        "server auth",
                        "client auth"
                    ]
                }
            }
        }
    }

    4. Create ca-csr.json file in /etc/kubernetes/pki/etcd folder with following content.

    {
        "CN": "etcd",
        "key": {
            "algo": "rsa",
            "size": 2048
        }
    }

    5. Create client.json file in /etc/kubernetes/pki/etcd folder with following content.

    {
        "CN": "client",
        "key": {
            "algo": "ecdsa",
            "size": 256
        }
    }

    $ cfssl gencert -initca ca-csr.json | cfssljson -bare ca -
    $ cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json -profile=client client.json | cfssljson -bare client

    6. Create a directory  /etc/kubernetes/pki/etcd on master-1 and master-2 and copy all the generated certificates into it.

    7. On all masters, now generate peer and etcd certs in /etc/kubernetes/pki/etcd. To generate them, we need the previous CA certificates on all masters.

    $ export PEER_NAME=$(hostname)
    $ export PRIVATE_IP=$(ip addr show eth0 | grep -Po 'inet K[d.]+')
    
    $ cfssl print-defaults csr > config.json
    $ sed -i 's/www.example.net/'"$PRIVATE_IP"'/' config.json
    $ sed -i 's/example.net/'"$PEER_NAME"'/' config.json
    $ sed -i '0,/CN/{s/example.net/'"$PEER_NAME"'/}' config.json
    
    $ cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json -profile=server config.json | cfssljson -bare server
    $ cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json -profile=peer config.json | cfssljson -bare peer

    This will replace the default configuration with your machine’s hostname and IP address, so in case if you encounter any problem just check the hostname and IP address are correct and rerun cfssl command.

    8. On all masters, Install etcd and set it’s environment file.

    $ yum install etcd -y
    $ touch /etc/etcd.env
    $ echo "PEER_NAME=$PEER_NAME" >> /etc/etcd.env
    $ echo "PRIVATE_IP=$PRIVATE_IP" >> /etc/etcd.env

    9. Now, we will create a 3 node etcd cluster on all 3 master nodes. Starting etcd service on all three nodes as systemd. Create a file /etc/systemd/system/etcd.service on all masters.

    [Unit]
    Description=etcd
    Documentation=https://github.com/coreos/etcd
    Conflicts=etcd.service
    Conflicts=etcd2.service
    
    [Service]
    EnvironmentFile=/etc/etcd.env
    Type=notify
    Restart=always
    RestartSec=5s
    LimitNOFILE=40000
    TimeoutStartSec=0
    
    ExecStart=/bin/etcd --name <host_name>  --data-dir /var/lib/etcd --listen-client-urls http://<host_private_ip>:2379,http://127.0.0.1:2379 --advertise-client-urls http://<host_private_ip>:2379 --listen-peer-urls http://<host_private_ip>:2380 --initial-advertise-peer-urls http://<host_private_ip>:2380 --cert-file=/etc/kubernetes/pki/etcd/server.pem --key-file=/etc/kubernetes/pki/etcd/server-key.pem --trusted-ca-file=/etc/kubernetes/pki/etcd/ca.pem --peer-cert-file=/etc/kubernetes/pki/etcd/peer.pem --peer-key-file=/etc/kubernetes/pki/etcd/peer-key.pem --peer-trusted-ca-file=/etc/kubernetes/pki/etcd/ca.pem --initial-cluster master-0=http://<master0_private_ip>:2380,master-1=http://<master1_private_ip>:2380,master-2=http://<master2_private_ip>:2380 --initial-cluster-token my-etcd-token --initial-cluster-state new --client-cert-auth=false --peer-client-cert-auth=false
    
    [Install]
    WantedBy=multi-user.target

    10. Ensure that you will replace the following placeholder with

    • <host_name> : Replace as the master’s hostname</host_name>
    • <host_private_ip>: Replace as the current host private IP</host_private_ip>
    • <master0_private_ip>: Replace as the master-0 private IP</master0_private_ip>
    • <master1_private_ip>: Replace as the master-1 private IP</master1_private_ip>
    • <master2_private_ip>: Replace as the master-2 private IP</master2_private_ip>

    11. Start the etcd service on all three master nodes and check the etcd cluster health:

    $ systemctl daemon-reload
    $ systemctl enable etcd
    $ systemctl start etcd
    
    $ etcdctl cluster-health

    This will show the cluster healthy and connected to all three nodes.

    Setup load balancer

    There are multiple cloud provider solutions for load balancing like AWS elastic load balancer, GCE load balancing etc. There might not be a physical load balancer available, we can setup a virtual IP load balancer to healthy node master. We are using keepalived for load balancing, install keepalived on all master nodes

    $ yum install keepalived -y

    Create the following configuration file /etc/keepalived/keepalived.conf on all master nodes:

    ! Configuration File for keepalived
    global_defs {
      router_id LVS_DEVEL
    }
    
    vrrp_script check_apiserver {
      script "/etc/keepalived/check_apiserver.sh"
      interval 3
      weight -2
      fall 10
      rise 2
    }
    
    vrrp_instance VI_1 {
        state <state>
        interface <interface>
        virtual_router_id 51
        priority <priority>
        authentication {
            auth_type PASS
            auth_pass velotiotechnologies
        }
        virtual_ipaddress {
            <virtual ip>
        }
        track_script {
            check_apiserver
        }
    }

    • state is either MASTER (on the first master nodes) or BACKUP (the other master nodes).
    • Interface is generally the primary interface, in my case it is eth0
    • Priority should be higher for master node e.g 101 and lower for others e.g 100
    • Virtual_ip should contain the virtual ip of master nodes

    Install the following health check script to /etc/keepalived/check_apiserver.sh on all master nodes:

    #!/bin/sh
    
    errorExit() {
        echo "*** $*" 1>&2
        exit 1
    }
    
    curl --silent --max-time 2 --insecure https://localhost:6443/ -o /dev/null || errorExit "Error GET https://localhost:6443/"
    if ip addr | grep -q <VIRTUAL-IP>; then
        curl --silent --max-time 2 --insecure https://<VIRTUAL-IP>:6443/ -o /dev/null || errorExit "Error GET https://<VIRTUAL-IP>:6443/"
    fi

    $ systemctl restart keepalived

    Setup three master node cluster

    Run kubeadm init on master0:

    Create config.yaml file with following content.

    apiVersion: kubeadm.k8s.io/v1alpha1
    kind: MasterConfiguration
    api:
      advertiseAddress: <master-private-ip>
    etcd:
      endpoints:
      - http://<master0-ip-address>:2379
      - http://<master1-ip-address>:2379
      - http://<master2-ip-address>:2379
      caFile: /etc/kubernetes/pki/etcd/ca.pem
      certFile: /etc/kubernetes/pki/etcd/client.pem
      keyFile: /etc/kubernetes/pki/etcd/client-key.pem
    networking:
      podSubnet: <podCIDR>
    apiServerCertSANs:
    - <load-balancer-ip>
    apiServerExtraArgs:
      endpoint-reconciler-type: lease

    Please ensure that the following placeholders are replaced:

    • <master-private-ip> with the private IPv4 of the master server on which config file resides.</master-private-ip>
    • <master0-ip-address>, <master1-ip-address> and <master-2-ip-address> with the IP addresses of your three master nodes</master-2-ip-address></master1-ip-address></master0-ip-address>
    • <podcidr> with your Pod CIDR. Please read the </podcidr>CNI network section of the docs for more information. Some CNI providers do not require a value to be set. I am using weave-net as pod network, hence podCIDR will be 10.32.0.0/12
    • <load-balancer-ip> with the virtual IP set up in the load balancer in the previous section.</load-balancer-ip>
    $ kubeadm init --config=config.yaml

    10. Run kubeadm init on master1 and master2:

    First of all copy /etc/kubernetes/pki/ca.crt, /etc/kubernetes/pki/ca.key, /etc/kubernetes/pki/sa.key, /etc/kubernetes/pki/sa.pub to master1’s and master2’s /etc/kubernetes/pki folder.

    Note: Copying this files is crucial, otherwise the other two master nodes won’t go into the ready state.

    Copy the config file config.yaml from master0 to master1 and master2. We need to change <master-private-ip> to current master host’s private IP.</master-private-ip>

    $ kubeadm init --config=config.yaml

    11. Now you can install pod network on all three masters to bring them in the ready state. I am using weave-net pod network, to apply weave-net run:

    export kubever=$(kubectl version | base64 | tr -d 'n') kubectl apply -f "https://cloud.weave.works/k8s/net?k8s-version=$kubever"

    12. By default, k8s doesn’t schedule any workload on the master, so if you want to schedule workload on master node as well, taint all the master nodes using the command:

    $ kubectl taint nodes --all node-role.kubernetes.io/master-

    13. Now that we have functional master nodes, we can join some worker nodes:

    Use the join string you got at the end of kubeadm init command

    $ kubeadm join 10.0.1.234:6443 --token llb1kx.azsbunpbg13tgc8k --discovery-token-ca-cert-hash sha256:1ad2a436ce0c277d0c5bd3826091e72badbd8417ffdbbd4f6584a2de588bf522

    High Availability in action

    The Kubernetes HA cluster will look like:

    [root@master-0 centos]# kubectl get nodes
    NAME       STATUS     ROLES     AGE       VERSION
    master-0   NotReady   master    4h        v1.10.4
    master-1   Ready      master    4h        v1.10.4
    master-2   Ready      master    4h        v1.10.4

    [root@master-0 centos]# kubectl get pods -n kube-system
    NAME                              READY     STATUS     RESTARTS   AGE
    kube-apiserver-master-0            1/1       Unknown    0          4h
    kube-apiserver-master-1            1/1       Running    0          4h
    kube-apiserver-master-2            1/1       Running    0          4h
    kube-controller-manager-master-0   1/1       Unknown    0          4h
    kube-controller-manager-master-1   1/1       Running    0          4h
    kube-controller-manager-master-2   1/1       Running    0          4h
    kube-dns-86f4d74b45-wh795          3/3       Running    0          4h
    kube-proxy-9ts6r                   1/1       Running    0          4h
    kube-proxy-hkbn7                   1/1       NodeLost   0          4h
    kube-proxy-sq6l6                   1/1       Running    0          4h
    kube-scheduler-master-0            1/1       Unknown    0          4h
    kube-scheduler-master-1            1/1       Running    0          4h
    kube-scheduler-master-2            1/1       Running    0          4h
    weave-net-6nzbq                    2/2       NodeLost   0          4h
    weave-net-ndx2q                    2/2       Running    0          4h
    weave-net-w2mfz                    2/2       Running    0          4h

    After failing over one master node the Kubernetes cluster is still accessible.

    [root@master-0 centos]# kubectl get nodes
    NAME       STATUS     ROLES     AGE       VERSION
    master-0   NotReady   master    4h        v1.10.4
    master-1   Ready      master    4h        v1.10.4
    master-2   Ready      master    4h        v1.10.4

    [root@master-0 centos]# kubectl get pods -n kube-system
    NAME                              READY     STATUS     RESTARTS   AGE
    kube-apiserver-master-0            1/1       Unknown    0          4h
    kube-apiserver-master-1            1/1       Running    0          4h
    kube-apiserver-master-2            1/1       Running    0          4h
    kube-controller-manager-master-0   1/1       Unknown    0          4h
    kube-controller-manager-master-1   1/1       Running    0          4h
    kube-controller-manager-master-2   1/1       Running    0          4h
    kube-dns-86f4d74b45-wh795          3/3       Running    0          4h
    kube-proxy-9ts6r                   1/1       Running    0          4h
    kube-proxy-hkbn7                   1/1       NodeLost   0          4h
    kube-proxy-sq6l6                   1/1       Running    0          4h
    kube-scheduler-master-0            1/1       Unknown    0          4h
    kube-scheduler-master-1            1/1       Running    0          4h
    kube-scheduler-master-2            1/1       Running    0          4h
    weave-net-6nzbq                    2/2       NodeLost   0          4h
    weave-net-ndx2q                    2/2       Running    0          4h
    weave-net-w2mfz                    2/2       Running    0          4h

    Even after one node failed, all the important components are up and running. The cluster is still accessible and you can create more pods, deployment services etc.

    [root@master-1 centos]# kubectl create -f nginx.yaml 
    deployment.apps "nginx-deployment" created
    [root@master-1 centos]# kubectl get pods -o wide
    NAME                                READY     STATUS    RESTARTS   AGE       IP              NODE
    nginx-deployment-75675f5897-884kc   1/1       Running   0          10s       10.117.113.98   master-2
    nginx-deployment-75675f5897-crgxt   1/1       Running   0          10s       10.117.113.2    master-1

    Conclusion

    High availability is an important part of reliability engineering, focused on making system reliable and avoid any single point of failure of the complete system. At first glance, its implementation might seem quite complex, but high availability brings tremendous advantages to the system that requires increased stability and reliability. Using highly available cluster is one of the most important aspects of building a solid infrastructure.

  • Getting Started With Kubernetes Operators (Golang Based) – Part 3

    Introduction

    In the first, getting started with Kubernetes operators (Helm based), and the second part, getting started with Kubernetes operators (Ansible based), of this Introduction to Kubernetes operators blog series we learned various concepts related to Kubernetes operators and created a Helm based operator and an Ansible based operator respectively. In this final part, we will build a Golang based operator. In case of Helm based operators, we were executing a helm chart when changes were made to the custom object type of our application, similarly in the case of an Ansible based operator we executed an Ansible role. In case of Golang based operator we write the code for the action we need to perform (reconcile logic) whenever the state of our custom object change, this makes the Golang based operators quite powerful and flexible, at the same time making them the most complex to build out of the 3 types.

    What Will We Build?

    The database server we deployed as part of our book store app in previous blogs didn’t have any persistent volume attached to it and we would lose data in case the pod restarts, to avoid this we will attach a persistent volume attached to the host (K8s worker nodes ) and run our database as an statefulset rather than a deployment. We will also add a feature to expand the persistent volume associated with the mongodb pod.

    Building the Operator

    1. Set up the project:  

    operator-sdk new bookstore-operator –dep-manager=dep

    INFO[0000] Generating api version blog.velotio.com/v1alpha1 for kind BookStore. 
    INFO[0000] Created pkg/apis/blog/group.go               
    INFO[0001] Created pkg/apis/blog/v1alpha1/bookstore_types.go 
    INFO[0001] Created pkg/apis/addtoscheme_blog_v1alpha1.go 
    INFO[0001] Created pkg/apis/blog/v1alpha1/register.go   
    INFO[0001] Created pkg/apis/blog/v1alpha1/doc.go        
    INFO[0001] Created deploy/crds/blog.velotio.com_v1alpha1_bookstore_cr.yaml 
    INFO[0009] Created deploy/crds/blog.velotio.com_bookstores_crd.yaml 
    INFO[0009] Running deepcopy code-generation for Custom Resource group versions: [blog:[v1alpha1], ] 
    INFO[0010] Code-generation complete.                    
    INFO[0010] Running OpenAPI code-generation for Custom Resource group versions: [blog:[v1alpha1], ] 
    INFO[0011] Created deploy/crds/blog.velotio.com_bookstores_crd.yaml 
    INFO[0011] Code-generation complete.                    
    INFO[0011] API generation complete.

    The above command creates the bookstore-operator folder in our $GOPATH/src, here we have set the –dep-manager as dep which signifies we want to use dep for managing dependencies, by default it uses go modules for managing dependencies. Similar to what we have seen earlier the operator sdk creates all the necessary folder structure for us inside the bookstore-operator folder.

    2. Add the custom resource definition

    operator-sdk add api –api-version=blog.velotio.com/v1alpha1 –kind=BookStore

    The above command creates the CRD and CR for the BookStore type. It also creates the golang structs (pkg/apis/blog/v1alpha1/bookstore_types.go)  for BookStore types.  It also registers the custom type (pkg/apis/blog/v1alpha1/register.go) with schema and generates deep-copy methods as well. Here we can see that all the generic tasks are being done by the operator framework itself allowing us to focus on building and object and the controller. We will update the spec of our BookStore object later. We will update the spec of BookStore type to include two custom types BookApp and BookDB.

    type BookStoreSpec struct {
    
    	BookApp BookApp     `json:"bookApp,omitempty"`
    	BookDB  BookDB      `json:"bookDB,omitempty"`
    }
    
    type BookApp struct {
    	 
    	Repository      string             `json:"repository,omitempty"`
    	Tag             string             `json:"tag,omitempty"`
    	ImagePullPolicy corev1.PullPolicy  `json:"imagePullPolicy,omitempty"`
            Replicas        int32              `json:"replicas,omitempty"`
            Port            int32              `json:"port,omitempty"`
    	TargetPort      int                `json:"targetPort,omitempty"`
    	ServiceType     corev1.ServiceType `json:"serviceType,omitempty"`
    }
    
    type BookDB struct {
    	 
    	Repository      string            `json:"repository,omitempty"`
    	Tag             string            `json:"tag,omitempty"`
    	ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
            Replicas        int32             `json:"replicas,omitempty"`
    	Port            int32             `json:"port,omitempty"`
    	DBSize          resource.Quantity `json:"dbSize,omitempty"`
    }

    Let’s also update the BookStore CR (blog.velotio.com_v1alpha1_bookstore_cr.yaml)

    apiVersion: blog.velotio.com/v1alpha1
    kind: BookStore
    metadata:name: example-bookstore
    spec:
      bookApp: 
        repository: "akash125/pyapp"
        tag: latest
        imagePullPolicy: "IfNotPresent"
        replicas: 1
        port: 80
        targetPort: 3000
        serviceType: "LoadBalancer"
      bookDB:
        repository: "mongo"
        tag: latest
        imagePullPolicy: "IfNotPresent"
        replicas: 1
        port: 27017
        dbSize: 2Gi

    3. Add the bookstore controller

    operator-sdk add controller –api-version=blog.velotio.com/v1alpha1 –kind=BookStore

    INFO[0000] Generating controller version blog.velotio.com/v1alpha1 for kind BookStore. 
    INFO[0000] Created pkg/controller/bookstore/bookstore_controller.go 
    INFO[0000] Created pkg/controller/add_bookstore.go      
    INFO[0000] Controller generation complete.

    The above command adds the bookstore controller (pkg/controller/bookstore/bookstore_controller.go) to the project and also adds it to the manager.

    If we take a look at the add function in the bookstore_controller.go file we can see that a new controller is created here and added to the manager so that the manager can start the controller when it (manager) comes up,  the add(mgr manager.Manager, r reconcile.Reconciler) is called by the public function Add(mgr manager.Manager) which also creates a new reconciler objects and passes it to the add where the controller is associated with the reconciler, in the add function we also set the type of object (BookStore) which the controller will watch.

    // Watch for changes to primary resource BookStore
    	err = c.Watch(&source.Kind{Type: &blogv1alpha1.BookStore{}}, &handler.EnqueueRequestForObject{})
    	if err != nil {
    		return err
    	}

    This ensures that for any events related to any object of BookStore type, a reconcile request (a namespace/name key) is sent to the Reconcile method associated with the reconciler object (ReconcileBookStore) here.

    4. Build the reconcile logic

    The reconcile logic is implemented inside the Reconcile method of the reconciler object of the custom type which implements the reconcile loop.

    As a part of our reconcile logic we will do the following

    1. Create the bookstore app deployment if it doesn’t exist.
    2. Create the bookstore app service if it doesn’t exist.
    3. Create the Mongodb statefulset if it doesn’t exist.
    4. Create the Mongodb service if it doesn’t exist.
    5. Ensure deployments and services match their desired configurations like the replica count, image tag, service port, size of the PV associated with the Mongodb statefulset etc.

     There are three possible events that can happen with the BookStore object

    1. The object got created: Whenever an object of kind BookStore is created we create all the k8s resources we mentioned above
    2. The object has been updated: When the object gets updated then we update all the k8s resources associated with it..
    3. The object has been deleted: When the object gets deleted we don’t need to do anything as while creating the K8s objects we will set the `BookStore` type as its owner which will ensure that all the K8s objects associated with it gets automatically deleted when we delete the object.

    On receiving the reconcile request the first step if to lookup for the object.

    func (r *ReconcileBookStore) Reconcile(request reconcile.Request) (reconcile.Result, error) {
    	reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
    	reqLogger.Info("Reconciling BookStore")
    
    	// Fetch the BookStore instance
    	bookstore := &blogv1alpha1.BookStore{}
    	err := r.client.Get(context.TODO(), request.NamespacedName, bookstore)

    If the object is not found, we assume that it got deleted and don’t requeue the request considering the reconcile to be successful.

    If any error occurs while doing the reconcile then we return the error and whenever we return non nil error value then controller requeues the request.

    In the reconcile logic we call the BookStore method which creates or updates all the k8s objects associated with the BookStore objects based on whether the object has been created or updated.

    func (r *ReconcileBookStore) BookStore(bookstore *blogv1alpha1.BookStore) error {
         reqLogger := log.WithValues("Namespace", bookstore.Namespace)
         mongoDBSvc := getmongoDBSvc(bookstore)
         msvc := &corev1.Service{}
         err := r.client.Get(context.TODO(), types.NamespacedName{Name: "mongodb-service", Namespace: bookstore.Namespace}, msvc)
         if err != nil {
    	if errors.IsNotFound(err) {
    	   controllerutil.SetControllerReference(bookstore, mongoDBSvc, r.scheme)
    	   err = r.client.Create(context.TODO(), mongoDBSvc)
    	   if err != nil { return err }
                } else {  return err }
            } else if !reflect.DeepEqual(mongoDBSvc.Spec, msvc.Spec) {
    	   mongoDBSvc.ObjectMeta = msvc.ObjectMeta
               controllerutil.SetControllerReference(bookstore, mongoDBSvc, r.scheme)
               err = r.client.Update(context.TODO(), mongoDBSvc)
    	   if err != nil { return err }
    	      reqLogger.Info("mongodb-service updated")
    	   }
       mongoDBSS := getMongoDBStatefulsets(bookstore)
       mss := &appsv1.StatefulSet{}
       err = r.client.Get(context.TODO(), types.NamespacedName{Name: "mongodb", Namespace: bookstore.Namespace}, mss)
       if err != nil {
          if errors.IsNotFound(err) {
    	reqLogger.Info("mongodb statefulset not found, will be created")
    	controllerutil.SetControllerReference(bookstore, mongoDBSS, r.scheme)
    	err = r.client.Create(context.TODO(), mongoDBSS)
    	if err != nil { return err }
    	} else {
    	    reqLogger.Info("failed to get mongodb statefulset")
    	    return err
    	   }
    	} else if !reflect.DeepEqual(mongoDBSS.Spec, mss.Spec) {
                 r.UpdateVolume(bookstore)
    	     mongoDBSS.ObjectMeta = mss.ObjectMeta
    	     mongoDBSS.Spec.VolumeClaimTemplates = mss.Spec.VolumeClaimTemplates
    	     controllerutil.SetControllerReference(bookstore, mongoDBSS, r.scheme)
    	     err = r.client.Update(context.TODO(), mongoDBSS)
    	     if err != nil { return err }
    	        reqLogger.Info("mongodb statefulset updated")
            }
       bookStoreSvc := getBookStoreAppSvc(bookstore)
       bsvc := &corev1.Service{}
       err = r.client.Get(context.TODO(), types.NamespacedName{Name: "bookstore-svc", Namespace: bookstore.Namespace}, bsvc)
       if err != nil {
          if errors.IsNotFound(err) {
    	  controllerutil.SetControllerReference(bookstore, bookStoreSvc, r.scheme)
    	  err = r.client.Create(context.TODO(), bookStoreSvc)
    	  if err != nil { return err }
    	  } else {
    	      reqLogger.Info("failed to get bookstore service")
    	      return err
    	    }
    	} else if !reflect.DeepEqual(bookStoreSvc.Spec, bsvc.Spec) {
    	      bookStoreSvc.ObjectMeta = bsvc.ObjectMeta
    	      bookStoreSvc.Spec.ClusterIP = bsvc.Spec.ClusterIP
    	      controllerutil.SetControllerReference(bookstore, bookStoreSvc, r.scheme)
    	      err = r.client.Update(context.TODO(), bookStoreSvc)
    	      if err != nil { return err }
    	          reqLogger.Info("bookstore service updated")
    	  }
      bookStoreDep := getBookStoreDeploy(bookstore)
      bsdep := &appsv1.Deployment{}
      err = r.client.Get(context.TODO(), types.NamespacedName{Name: "bookstore", Namespace: bookstore.Namespace}, bsdep)
      if err != nil {
        if errors.IsNotFound(err) {
    	controllerutil.SetControllerReference(bookstore, bookStoreDep, r.scheme)
    	err = r.client.Create(context.TODO(), bookStoreDep)
    	if err != nil { return err }
    	} else {
    	   reqLogger.Info("failed to get bookstore deployment")
    	     return err
    	    }
    	} else if !reflect.DeepEqual(bookStoreDep.Spec, bsdep.Spec) {
    	       bookStoreDep.ObjectMeta = bsdep.ObjectMeta
    	       controllerutil.SetControllerReference(bookstore, bookStoreDep, r.scheme)
    	       err = r.client.Update(context.TODO(), bookStoreDep)
    	       if err != nil { return err }
    			reqLogger.Info("bookstore deployment updated")
    	}
      r.client.Status().Update(context.TODO(), bookstore)
      return nil
    }

    The implementation of the above method is a bit hacky but gives an idea of the flow. In the above function, we can see that we are setting the BookStore type as an owner for all the resources controllerutil.SetControllerReference(c, bookStoreDep, r.scheme) as we had discussed earlier. If we look at the owner reference for these objects we would see something like this.

    ownerReferences:
      - apiVersion: blog.velotio.com/v1alpha1
        blockOwnerDeletion: true
        controller: true
        kind: BookStore
        name: example-bookstore
        uid: 0ef42889-deb4-11e9-ba56-42010a800256
      resourceVersion: "20295281"

    5.  Deploy the operator and verify its working

    The approach to deploy and verify the working of the bookstore application is similar to what we did in the previous two blogs the only difference being that now we have deployed the Mongodb as a stateful set and even if we restart the pod we will see that the information that we stored will still be available.

    6. Verify volume expansion

    For updatingthe volume associated with the mongodb instance we first need to update the size of the volume we specified while creating the bookstore object. In the example above I had set it to 2GB let’s update it to 3GB and update the bookstore object.

    Once the bookstore object is updated if we describe the mongodb PVC we will see that it still has 2GB PV but the conditions we will see something like this.

    Conditions:
      Type                      Status  LastProbeTime                     LastTransitionTime                Reason  Message
      ----                      ------  -----------------                 ------------------                ------  -------
      FileSystemResizePending   True    Mon, 01 Jan 0001 00:00:00 +0000   Mon, 30 Sep 2019 15:07:01 +0530           Waiting for user to (re-)start a pod to finish file system resize of volume on node.
    @velotiotech

    It is clear from the message that we need to restart the pod for resizing of volume to reflect. Once we delete the pod it will get restarted and the PVC will get updated to reflect the expanded volume size.

    The complete code is available here.

    Conclusion

    Golang based operators are built mostly for stateful applications like databases. The operator can automate complex operational tasks allow us to run applications with ease. At the same time, building and maintaining it can be quite complex and we should build one only when we are fully convinced that our requirements can’t be met with any other type of operator. Operators are an interesting and emerging area in Kubernetes and I hope this blog series on getting started with it help the readers in learning the basics of it.

  • A Beginner’s Guide to Kubernetes Python Client

    A couple of weeks back, we started working with the Kubernetes Python client to carry out basic operations on its components/ resources, and that’s when we realized how few resources there were (guides, docs) on the internet. So, we experimented and decided to share our findings with the community.

    This article is targeted towards an audience that is familiar with Kubernetes, its usage, and its architecture. This is not a simple Kubernetes guide; it’s about Kubernetes using Python, so as we move further, we may shed light on a few things that are required, but a few will be left for self exploration.

    Kubernetes Overview

    Kubernetes is an open-source container orchestration tool, largely used to simplify the process of deployment, maintenance, etc. in application development. Kubernetes is built to offer highly available, scalable, and reliable applications.

    Generally, kubectl commands are used to create, list, and delete the Kubernetes resources, but for this article, we put on a developer’s hat and use the Python way of doing things. In this article, we learn how to create, manage, and interact with Kubernetes resources using the Kubernetes’ Python library.

    But why, you may ask?

    Well, having an option of doing things programmatically creates potential of endless exciting innovations for developers. Using Python, we can:

    • Create and manage Kubernetes resources dynamically
    • Apply algorithms that change the state, amount of resources in our cluster
    • Build a more robust application with solid alerting and monitoring features

    So, let us begin:

    Kubernetes achieves what it does with the help of its resources. These resources are the building blocks for developing a scalable, reliable application.

    Let’s briefly explore these resources to understand what they are and how exactly they work together in Kubernetes:

    • Node: Simple server, a physical/virtual machine.
    • Pod: Smallest unit of Kubernetes, provides abstraction over a container. Creates a running env/layer on top of the container. Usually runs only one application container but can run multiple as well.
    • Service: Static IP address for the pod. Remains the same even after the pod dies. Also doubles as a load-balancer for multiple pods:
      a) External services are used to make the app accessible through external sources.
      b) Internal services are used when accessibility is to be restricted.
    • Ingress: Additional layer of security and address translation for services. All the requests first go to ingress then forwarded to the service.
    • ConfigMap: External configuration of your app like urls of database or other services.
    • Secret: To store secret/sensitive data like db-credentials, etc., encoded in base_64 format.
    • Volumes: Kubernetes does not manage any data persistence on its own. Volumes are used to persist data generated by pods. It attaches a physical storage to the pod that can be both local or remote like cloud or on-premise servers.
    • Deployment: Defines blueprint of the pods and its replication factor. A layer of abstraction over pods makes the configuration convenient.
    • StatefulSet: Applications that are stateful are created using these to avoid data inconsistency. Same as deployment.

    These are a few of the basic Kubernetes resources. If you want to explore the rest of these resources, you can click here.

    Apart from these resources, there are also namespaces in Kubernetes. You will come across them quite a few times in this article, so here are some noteworthy points for Kubernetes namespaces:

    Kubernetes namespaces: Used to group/organize resources in the cluster. It is like a virtual cluster inside a cluster. Namespaces are used to:

    1. Structure resources
    2. Avoid conflicts between teams
    3. Share services between different environments
    4. Access restrictions
    5. Limiting resources.

    Setting up the system

    The Kubernetes library comes to our aid with quite a few modules, the ones featured in this article are client and config modules from the package; we will be using these two heavily. So, let’s install the Kubernetes Python Client:

    To install the Kubernetes Python client, we make use of Python’s standard package installer pip:

    pip install kubernetes

    For installation from the source, we can refer to this guide from the official Python client git repository. 

    Now that we have the python-kubernetes package installed, we can import it as:

    from kubernetes import client, config

    Loading cluster configurations

    To load our cluster configurations, we can use one of the following methods:

    config.load_kube_config()  # for local environment
    # or
    config.load_incluster_config()

    Executing this will load the configurations for your clusters from your local or remote .kube/config file.

    Interacting with Kubernetes Resources

    Now that we have loaded the configurations, we can use the client module to interact with the resources.

    Get Resources: kubectl get commands are used to list all kinds of resources in a cluster for eg:

    – List nodes: To list all the nodes in the cluster, we fire following kubectl command:

    kubectl get nodes  # lists all the nodes

    In Python, we instantiate CoreV1Api class from client module:

    v1 = client.CoreV1Api()
    v1.list_node()  
    # returns a JSON with all the info like spec, metadata etc, for each node

    – List namespaces: To list all the namespaces in your cluster, by-default lists at least four:

    kubectl get namespaces  
    #	NAME          		 STATUS   	AGE
    #	default       		 Active   	94d
    #	kube-public   		 Active   	94d
    #	kube-system   		 Active   	94d

    In the Python client, we can achieve the same by:

    v1.list_namespace()
    """
    returns a JSON with all the info like spec, metadata for each namespace
    For eg:
    {'api_version': 'v1',
     'items': [{'api_version': None,
            	'kind': None,
            	'metadata': {'annotations': None,
                         	'cluster_name': None,
                         	'creation_timestamp': datetime.datetime(2021, 2, 11, 11, 29, 32, tzinfo=tzutc()),
                         	'deletion_grace_period_seconds': None,
                         	'deletion_timestamp': None,
                         	'finalizers': None,
                         	'generate_name': None,
                         	'generation': None,
                         	'labels': None,
                         	'managed_fields': [{'api_version': 'v1',
                                             	'fields_type': 'FieldsV1',
                                             	'fields_v1': {'f:status': {'f:phase': {}}},
                                             	'manager': 'kube-apiserver',
                                             	'operation': 'Update',
                                             	'time': datetime.datetime(2021, 2, 11, 11, 29, 32, tzinfo=tzutc())}],
                         	'name': 'default',
                         	'namespace': None,
                         	'owner_references': None,
                         	'resource_version': '199',
                         	'self_link': None,
                         	'uid': '3a362d64-437d-45b5-af19-4af9ae2c75fc'},
            	'spec': {'finalizers': ['kubernetes']},
            	'status': {'conditions': None, 'phase': 'Active'}}],
    'kind': 'NamespaceList',
     'metadata': {'_continue': None,
              	'remaining_item_count': None,
              	'resource_version': '69139',
              	'self_link': None}}
    """

    Similarly, we can list all the resources or resources in a particular namespace.

    For example, to list pods in all namespaces:

    v1.list_pod_for_all_namespaces()
    v1.list_persistent_volume_claim_for_all_namespaces()

    For all the resources that can be group within a given namespace, we can use:

    # v1.list_namespaced_pod(<namespace>)
    v1.list_namespaced_pod(namespace=’default’)
    
    # v1.list_namespaced_service(<namespace>)
    v1.list_namespaced_service(namespace=’default’)
    and so on.

    Creating Resources: The usual way to create resources in Kubernetes is to use a kubectl create command with required parameters (defaults if not specified) or to use kubectl apply command, which takes a YAML/JSON format configuration file as input. This file contains all the specifications and metadata for the component to be created. For example:

    kubectl create deployment my-nginx-depl --image=nginx
    kubectl apply -f nginx_depl.yaml

    Where the contents of nginx_depl.yaml could be as follows:

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: nginx-deployment
      labels:
        app: nginx
    spec:
      replicas: 3
      selector:
        matchLabels:
          app: nginx
      template:
        metadata:
          labels:
            app: nginx
        spec:
          containers:
          - name: nginx
            image: nginx:1.14.2
            ports:
            - containerPort: 80

    To create resources in Python, though, we use create functions from the same instance of CoreV1Api class:

    # v1.create_namespaced_pod(<namespace>, <body>)
    # v1.create_namespaced_persistent_volume_claim(<namespace>, <body>)

    So, basically, we just need two things: a string type namespace in which we want our resource to be in and a body.

    This body is the same as the config.yaml file that we saw earlier. But, how exactly do we create or use that in our code? We utilize the component specific classes that this library offers us for this.

    Let us take an example, to create a pod we use V1Pod class from the Kubernetes.client.

    An instance of this V1Pod contains all the params like kind, metadata, spec, etc., so all we need to pass them and then we are good. And while we are at it, let’s create metadata and spec as well using a couple more classes.

    1. V1ObjectMeta: This takes all the fields that can be part of metadata as parameters, e.g.

    metadata = client.V1ObjectMeta(name='md1')
    
    # We could also set fields by accessing them through instance like:
    metadata.name = 'md2'

    2. V1Container: If you recall the brief definition of Kubernetes pods given earlier, we realize that pods are just layers above containers, which means we will have to provide the container(s) that the pods abstracts over. The V1Container class from Kubernetes client does just what we need.

    These containers run the specified image, with their name taken as a parameter by the object. Containers also have several other parameters like volume_mounts, ports that can also be passed while instantiation or could be set later using object reference.

    We create a container using:

    # container1 = client.V1Container(<name>, <image>) e.g:
    container1 = client.V1Container(‘my_container’, ‘nginx’)

    Kubernetes pods can have multiple containers running inside, hence the V1PodSpec class expects a list of those while we create a pod spec.

    containers = [container1, container2…]

    3. V1PodSpec: Depending on the component we are working on, the class for its spec and params change. For a pod, we can use V1PodSpec as:

    # pod_spec = client.V1PodSpec(<containers_list>)
    pod_spec = client.V1PodSpec(containers=containers)

    Now that we have both metadata and spec, let’s construct the pod’s body:

    pod_body = client.V1Pod(metadata=metadata, spec=pod_spec, kind='Pod', api_version='v1')

    And then, finally we could pass these to create a pod:

    pod = v1.create_namespaced_pod(namespace=my-namespace, body=pod_body)

    And there you have it, that’s how you create a pod.

    Similarly, we can create other resources, although not all resources take the same set of parameters, for example PersistentVolume (PV in short) does not come under namespaces, it is a cluster wide resource, so naturally it won’t be expecting a namespace parameter.

    Fetching Logs:

    When it comes to monitoring and debugging Kubernetes’ resources, logs play a major role. Using the Kubernetes Python client, we can fetch logs for resources. For example, to fetch logs for a pod:

    Using kubectl:

    # kubectl logs pod_name
    kubectl logs my-pod

    Using Python:

    pod_logs = v1.read_namespaced_pod_log(<pod_name>, <namespace>)
    pod_logs = v1.read_namespaced_pod_log(name=’my-app’, namespace=’default’)

    Deleting Resources: For deletion, we will be following the same class that we have been using so far, i.e kubernetes.client.CoreV1Api.

    There are functions that directly deal with deletion of that component, for example:

    #v1.delete_namespaced_pod(<pod_name>, <namespace>)
    v1.delete_namespaced_pod(name=’my-app’, namespace=’default’)

    Pass the required parameters and the deletion will take place as expected.

    Complete Example for creating a Kubernetes Pod:

    from kubernetes import client, config
    	
    config.load_kube_config()
    v1 = client.CoreV1Api()
    	
    namespaces_list = v1.list_namespace()
    namespaces = [item.metadata.name for item in namespaces_list.items]
    	
    pods_list = v1.list_namespaced_pod(namespace=’default’)
    pods = [item.metadata.name for item in pod_list.items]
    
    containers = []
    container1 = client.V1Container(name=’my-nginx-container’, image=’nginx’)
    containers.append(container1)
    	
    pod_spec = client.V1PodSpec(containers=containers)
    pod_metadata = client.V1ObjectMeta(name=’my-pod’, namespace=’default’)
    
    pod_body = client.V1Pod(api_version=’v1’, kind=’Pod’, metadata=pod_metadata, spec=pod_spec)
    	
    v1.create_namespaced_pod(namespace=’default’, body=pod_body)
    	
    pod_logs = v1.read_namespaced_pod_log(name=’my-pod’, namespace='default')
    
    v1.delete_namespaced_pod(namespace=’default’, name=’my-pod’)

    Conclusion

    There are quite a lot of ways this article could have been written, but as we conclude, it’s quite evident that we have barely scratched the surface. There are many more interesting, advanced things that we can do with this library, but those are beyond the scope of this article.

    We can do almost all the operations with the Python client that we usually do with kubectl on Kubernetes resources. We hope that we managed to keep the content both interesting and informative. 

    If you’re looking for a comprehensive guide on Kubernetes or something interesting to do with it, don’t worry, we’ve got you covered. You can refer to a few of our other articles and might find just what you need:

    1. Kubernetes CSI in Action: Explained with Features and Use Cases

    2. Continuous Deployment with Azure Kubernetes Service, Azure Container Registry & Jenkins

    3. Demystifying High Availability in Kubernetes Using Kubeadm

    References

    1. Official Kubernetes documentation: https://kubernetes.io/docs

    2. Kubernetes Resources: https://kubernetes.io/docs/reference/glossary/?core-object=true

    3. Kubernetes Python client: https://github.com/kubernetes-client/python

    4. Kubclt: https://kubernetes.io/docs/reference/kubectl/overview/