Tag: python

  • Parallelizing Heavy Read and Write Queries to SQL Datastores using Spark and more!

    The amount of data in our world has been exploding exponentially day by day. Processing and analyzing this Big Data has become key in the current age to make informed, data-driven decisions. Spark is a unified distributed data processing engine used for Big Data. Spark can be used to process Big Data in an efficient manner. Spark lets you process Big Data faster by splitting the work into chunks and assigning those chunks to computation resources across nodes. It can handle up to petabytes of data, which is millions of gigabytes of data. It processes all its data in memory, which makes it faster.

    We talked about processing Big Data in Spark, but we know spark doesn’t store any data like other file systems. So, to process data in Spark, we must read data from different data sources, clean or process the data, and again store this data in one of the target data sources. Data sources can be files, APIs, databases, or streams. 

    Database management systems have been present for a decade. Many applications generate huge amounts of data and store data in database management systems. And a lot of times, we need to connect spark to the database and process that data.

    In this blog, we are going to discuss how to use spark to read from and write to databases in parallel. Our focus will be on reading/writing data from/to the database using different methods, which will help us read/write TeraBytes of data in an efficient manner.

    Reading / Writing data from/to Database using Spark:

    To read data or write data from/to the database, we will need to perform a few basic steps regardless of any programming language or framework we are using. What follows is an overview of the steps to read data from databases.

    Step 1: Register Driver or Use Connector

    Get the respective driver of your database and register the driver, or use the connector to connect to the database.

    Step 2: Make a connection

    Next, the driver or connector makes a connection to the database.

    Step 3: Run query statement

    Using the connection created in the previous step, execute the query, which will return the result.

    Step 4: Process result

    For the result, we got in the previous step, process it as per your requirement.

    Step 5: Close the connection

    Dataset we are using:

    Covid data

    This dataset contains details of COVID patients across all states. It has different information such as State, Confirmed, Recovered, Deceased, Other, Tested, and Date.

    You can load this dataset in any of the databases you work with and can try out the entire discussion practically.

    The following image shows ten records of the entire dataset.

    Single-partition Spark program:

    ## Creating a spark session and adding Postgres Driver to spark.
    from pyspark.sql import SparkSession
    
    ## Creating spark session and adding Postgres Driver to spark.
    spark_session = SparkSession.builder 
        .master("local") 
        .appName("Databases") 
        .config("spark.jars.packages", "org.postgresql:postgresql:42.2.8") 
        .getOrCreate()
    
    hostname = "localhost",
    jdbc_port = 5432,
    dbname = "aniket",
    username = "postgres",
    password = "pass@123"
    
    jdbc_url = "jdbc:postgresql://{0}:{1}/{2}".format(hostname, jdbc_port, dbname)
    
    ## reading data
    table_data_df = spark_session.read 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "aniket") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .load()
    
    ## writing data
    table_data_df.write 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "spark_schema.zipcode_table") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .save()

    Spark provides an API to read data from a database and is very simple to use. First of all, we will need to create a Spark session. Then add the driver to Spark. It can be added through the program itself, or we can add it using shell also.

    The first line of code imports the SparkSession class. This is the entry point to programming Spark with the Dataset and DataFrame API

    From the fifth to the ninth line of the above code, we are creating a spark session on a local system with four cores, which will be used for interaction with our spark application. We specify the name for our application using appName(), which in our case, is ‘Databases.’ This app name will be shown on Webb UI for our cluster. Next, we can specify any configurations for the spark application using config(). In our case, we have specified the configuration of the driver for the Postgres database, which will be used to create a connection with the Postgres database. You can specify the driver of any of the available databases. 

    To connect to the database, we must have a hostname, port, database name, username, and password with us. Those details are in 10 through 16 lines of the above code.

    Refer to the code lines from 19 to 28 in the above snippet. Up until now, we have had our Spark session and all the information that we need to connect to the database. Using the Spark Read API, we read the data from the database. This will create a connection to the Postgres database from one of the cores that we have allocated for the Spark application. And using this connection, it will read the data into the table_data_df dataframe. Even if we have multiple cores for our application, it will still create only one connection from one of the cores. The rest of the cores will not be utilized. While we will discuss how to utilize all cores, our first focus is here.

    Refer to the code lines from 29 to 38 in the above snippet. We have the data now, so let’s try to write it to the database. Using the Spark Write API, we will write data to the database. This will also create only one connection to the database from one of the cores that we have allocated for the Spark application. Even if we have more cores for the application, it still uses only one core with the above code.

    Output of Program:

    /usr/bin/python3.8 /home/aniketrajput/aniket_work/Spark/main.py
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
    WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    :: loading settings :: url = jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
    Ivy Default Cache set to: /home/aniketrajput/.ivy2/cache
    The jars for the packages stored in: /home/aniketrajput/.ivy2/jars
    org.postgresql#postgresql added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8;1.0
    	confs: [default]
    	found org.postgresql#postgresql;42.2.8 in central
    :: resolution report :: resolve 113ms :: artifacts dl 3ms
    	:: modules in use:
    	org.postgresql#postgresql;42.2.8 from central in [default]
    	---------------------------------------------------------------------
    	|                  |            modules            ||   artifacts   |
    	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    	---------------------------------------------------------------------
    	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
    	---------------------------------------------------------------------
    :: retrieving :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8
    	confs: [default]
    	0 artifacts copied, 1 already retrieved (0kB/5ms)
    22/04/22 11:55:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |        state|         district|confirmed|recovered|deceased|other|tested|      date|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |Uttar Pradesh|         Varanasi|    23512|    23010|     456|    0|595510|2021-02-24|
    |  Uttarakhand|           Almora|     3259|     3081|      25|  127| 84443|2021-02-24|
    |  Uttarakhand|        Bageshwar|     1534|     1488|      17|   26| 55626|2021-02-24|
    |  Uttarakhand|          Chamoli|     3486|     3373|      15|   88| 90390|2021-02-24|
    |  Uttarakhand|        Champawat|     1819|     1790|       9|    7| 95068|2021-02-24|
    |  Uttarakhand|         Dehradun|    29619|    28152|     962|  439|401496|2021-02-24|
    |  Uttarakhand|         Haridwar|    14137|    13697|     158|  175|369542|2021-02-24|
    |  Uttarakhand|         Nainital|    12636|    12254|     237|   79|204422|2021-02-24|
    |  Uttarakhand|    Pauri Garhwal|     5145|     5033|      60|   24|138878|2021-02-24|
    |  Uttarakhand|      Pithoragarh|     3361|     3291|      47|   11| 72686|2021-02-24|
    |  Uttarakhand|      Rudraprayag|     2270|     2251|      10|    7| 52378|2021-02-24|
    |  Uttarakhand|    Tehri Garhwal|     4227|     4026|      16|  170|105111|2021-02-24|
    |  Uttarakhand|Udham Singh Nagar|    11538|    11267|     117|  123|337292|2021-02-24|
    |  Uttarakhand|       Uttarkashi|     3789|     3645|      17|  118|120026|2021-02-24|
    |  West Bengal|       Alipurduar|     7705|     7616|      86|    0|  null|2021-02-24|
    |  West Bengal|          Bankura|    11940|    11788|      92|    0|  null|2021-02-24|
    |  West Bengal|          Birbhum|    10035|     9876|      89|    0|  null|2021-02-24|
    |  West Bengal|      Cooch Behar|    11835|    11756|      72|    0|  null|2021-02-24|
    |  West Bengal| Dakshin Dinajpur|     8179|     8099|      74|    0|  null|2021-02-24|
    |  West Bengal|       Darjeeling|    18423|    18155|     203|    0|  null|2021-02-24|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    only showing top 20 rows
    
    
    Process finished with exit code 0

    Multiple Partition spark program:

    from pyspark.sql import SparkSession
    
    ## Creating a spark session and adding Postgres Driver to spark.
    spark_session = SparkSession.builder 
        .master("local[4]") 
        .appName("Databases") 
        .config("spark.jars.packages", "org.postgresql:postgresql:42.2.8")
        .getOrCreate()
    
    hostname = "localhost"
    jdbc_port = 5432
    dbname = "postgres"
    username = "postgres"
    password = "pass@123"
    
    jdbc_url = "jdbc:postgresql://{0}:{1}/{2}".format(hostname, jdbc_port, dbname)
    
    partition_column = 'date'
    lower_bound = '2021-02-20'
    upper_bound = '2021-02-28'
    num_partitions = 4
    
    ## reading data
    table_data_df = spark_session.read 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "covid_data") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .option("partitionColumn", partition_column) 
        .option("lowerBound", lower_bound) 
        .option("upperBound", upper_bound) 
        .option("numPartitions", num_partitions) 
        .load()
    
    table_data_df.show()
    
    ## writing data
    table_data_df.write 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "covid_data_output") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .option("numPartitions", num_partitions) 
        .save()

    As promised in the last section, we will discuss how we can optimize for resource utilization. In the last section, we had only one connection, utilizing very limited resources and causing resources to be idle or unused. To get over this, the Spark Read and Write API has a way by providing a few extra attributes. And those are partitionColumn, lowerBound, upperBound. These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. For each partition, there will be an individual core with its own connection performing the reads or writes. Thus, making the database operation in parallel.

    This is an efficient way of reading and writing data from databases in spark rather than just doing it with one partition. 

    Partitions are decided by the Spark API in the following way.

    Let’s consider an example where:

    lowerBound: 0

    upperBound: 1000

    numPartitions: 10

    Stride is equal to 100, and partitions correspond to the following queries:

    SELECT * FROM table WHERE partitionColumn BETWEEN 0 AND 100 

    SELECT * FROM table WHERE partitionColumn BETWEEN 100 AND 200 

    SELECT * FROM table WHERE partitionColumn > 9000

    BETWEEN here is exclusive on the upper bound.

    Now we have data in multiple partitions. Each executor can have one or more partitions based on cluster configuration. Suppose we have 10 cores and 10 partitions. One partition of data can be fetched from one executor using one core. So, 10 partitions of data can be fetched from 10 executors. Each of these executors will create the connection to the database and will read the data.

    Note– lowerbound and upperbound does not filter the data. It just helps spark to decide the stride of data.

         partitionColumn must be a numeric, date, or timestamp column from the table

    Also, there are some attributes that can be used during the write operation to optimize the write operation. One of the attributes is “batchsize”. The JDBC batch size, which determines how many rows to insert per round trip. This can help the performance of JDBC drivers. This option applies only to writing. One more attribute called “truncate” can be helpful to optimize the write operation. This is a JDBC writer-related option. When SaveMode.Overwrite is enabled, it causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient and prevents the table metadata (e.g., indices) from being removed.

    Output of Program:

    /usr/bin/python3.8 /home/aniketrajput/aniket_work/Spark/main.py
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
    WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    :: loading settings :: url = jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
    Ivy Default Cache set to: /home/aniketrajput/.ivy2/cache
    The jars for the packages stored in: /home/aniketrajput/.ivy2/jars
    org.postgresql#postgresql added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404;1.0
    	confs: [default]
    	found org.postgresql#postgresql;42.2.8 in central
    :: resolution report :: resolve 104ms :: artifacts dl 3ms
    	:: modules in use:
    	org.postgresql#postgresql;42.2.8 from central in [default]
    	---------------------------------------------------------------------
    	|                  |            modules            ||   artifacts   |
    	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    	---------------------------------------------------------------------
    	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
    	---------------------------------------------------------------------
    :: retrieving :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404
    	confs: [default]
    	0 artifacts copied, 1 already retrieved (0kB/4ms)
    22/04/22 12:20:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |        state|         district|confirmed|recovered|deceased|other|tested|      date|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |Uttar Pradesh|         Varanasi|    23512|    23010|     456|    0|595510|2021-02-24|
    |  Uttarakhand|           Almora|     3259|     3081|      25|  127| 84443|2021-02-24|
    |  Uttarakhand|        Bageshwar|     1534|     1488|      17|   26| 55626|2021-02-24|
    |  Uttarakhand|          Chamoli|     3486|     3373|      15|   88| 90390|2021-02-24|
    |  Uttarakhand|        Champawat|     1819|     1790|       9|    7| 95068|2021-02-24|
    |  Uttarakhand|         Dehradun|    29619|    28152|     962|  439|401496|2021-02-24|
    |  Uttarakhand|         Haridwar|    14137|    13697|     158|  175|369542|2021-02-24|
    |  Uttarakhand|         Nainital|    12636|    12254|     237|   79|204422|2021-02-24|
    |  Uttarakhand|    Pauri Garhwal|     5145|     5033|      60|   24|138878|2021-02-24|
    |  Uttarakhand|      Pithoragarh|     3361|     3291|      47|   11| 72686|2021-02-24|
    |  Uttarakhand|      Rudraprayag|     2270|     2251|      10|    7| 52378|2021-02-24|
    |  Uttarakhand|    Tehri Garhwal|     4227|     4026|      16|  170|105111|2021-02-24|
    |  Uttarakhand|Udham Singh Nagar|    11538|    11267|     117|  123|337292|2021-02-24|
    |  Uttarakhand|       Uttarkashi|     3789|     3645|      17|  118|120026|2021-02-24|
    |  West Bengal|       Alipurduar|     7705|     7616|      86|    0|  null|2021-02-24|
    |  West Bengal|          Bankura|    11940|    11788|      92|    0|  null|2021-02-24|
    |  West Bengal|          Birbhum|    10035|     9876|      89|    0|  null|2021-02-24|
    |  West Bengal|      Cooch Behar|    11835|    11756|      72|    0|  null|2021-02-24|
    |  West Bengal| Dakshin Dinajpur|     8179|     8099|      74|    0|  null|2021-02-24|
    |  West Bengal|       Darjeeling|    18423|    18155|     203|    0|  null|2021-02-24|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    only showing top 20 rows
    
    
    Process finished with exit code 0

    We have seen how to read and write data in Spark. Spark is not the only way to connect with databases, right? There are multiple ways we can access databases and try to achieve parallel read-writes. We will discuss this in further sections. We will mainly focus on reading and writing it from python.

    Single Thread Python Program: 

    import traceback
    import psycopg2
    import pandas as pd
    
    class PostgresDbClient:
        def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password):
            self.db_host = postgres_hostname
            self.db_port = postgres_jdbcport
            self.db_name = postgres_dbname
            self.db_user = username
            self.db_pass = password
    
        def create_conn(self):
            conn = None
            try:
                print('Connecting to the Postgres database...')
                conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port))
                print('Successfully connected to the Postgres database...')
            except Exception as e:
                print("Cannot connect to Postgres.")
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
            return conn
    
        def read(self, query):
            try:
                conn = self.create_conn()
                cursor = conn.cursor()
                print(f"Reading data !!!")
                cursor.execute(query)
                data = cursor.fetchall()
                print(f"Read Data !!!")
                cursor.close()
                conn.close()
                return data
            except Exception as e:
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
    
    
    if __name__ == "__main__":
        hostname = "localhost"
        jdbc_port = 5432
        dbname = "postgres"
        username = "postgres"
        password = "pass@123"
        table_name = "covid_data"
        query = f"select * from {table_name}"
    
        db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password)
        data = pd.DataFrame(db_client.read(query))
        print(data)

    To integrate Postgres with Python, we have different libraries or adopters that we can use. But Psycopg is the widely used adopter. First off all, you will need to install the Psycopg2 library. Psycopg2 is a slightly updated version of the Psycopg adapter. You install it using pip or any way you are comfortable with.

    To connect with the Postgres database, we need hostname, port, database name, username, and password. We are storing all these details as attributes in class. The create connection method will form a connection with the Postgres database using the connect() method of psycopg2 module. This method will return a connection object.
    In the read method, we call this connection method and get a connection object. Using this connection object, we create a cursor. This cursor is bound to have a connection with the database for its lifetime and execute all the commands or queries on the database. Using this query object, we execute a read query on the database. Then the data returned by the executing read query can be fetched using the fetchall() method. Then we close the connection.

    To run the program, we have specified details of database and query. Next, we create an object of PostgresDbClient and call the read method from class PostgresDbClient. This read method will return as data and we are converting this data into relational format using pandas.

    This implementation is very straightforward: this program creates one process in our system and fetches all the data using system resources, CPU, memory, etc. The drawback of this approach is that suppose this program uses 30 percent CPU and memory resources out of 100%, then the remaining 70% of resources are idle. We can maximize this usage by other means like multithreading or multiprocessing. 

    Output of Program:

    Connecting to the Postgres database...
    Successfully connected to the Postgres database...
    Reading data !!!
    Read Data !!!
                                  0              1    2   3  4  5     6           7
    0   Andaman and Nicobar Islands        Unknown   33  11  0  0  None  2020-04-26
    1                Andhra Pradesh      Anantapur   53  14  4  0  None  2020-04-26
    2                Andhra Pradesh       Chittoor   73  13  0  0  None  2020-04-26
    3                Andhra Pradesh  East Godavari   39  12  0  0  None  2020-04-26
    4                Andhra Pradesh         Guntur  214  29  8  0  None  2020-04-26
    ..                          ...            ...  ...  .. .. ..   ...         ...
    95                        Bihar         Araria    1   0  0  0  None  2020-04-30
    96                        Bihar          Arwal    4   0  0  0  None  2020-04-30
    97                        Bihar     Aurangabad    8   0  0  0  None  2020-04-30
    98                        Bihar          Banka    3   0  0  0  None  2020-04-30
    99                        Bihar      Begusarai   11   5  0  0  None  2020-04-30
    
    [100 rows x 8 columns]
    
    Process finished with exit code 0

    Multi Thread python program:

    In the previous section, we discussed the drawback of a single process and single-thread implementation. Let’s get started with how to maximize resource usage. Before getting into multithreading, let’s understand a few basic but important concepts.

    What is a process?

    When you execute any program, the operating system loads it in memory and then starts executing the program. This instance of the program being executed is called a process. Computing and memory resources are associated with each process separately.

    What is a thread?

    A thread is a sequential flow of execution. A process is also a thread. Usually, the process is called a main thread. Unlike a process, the same computing and memory resources can be shared with multiple threads.

    What is multithreading?

    This is when a process has multiple threads, along with the main thread, and these threads run independently but concurrently using the same computing and memory resources associated with the process. Such a program is called a multithreaded program or process. Multithreading uses resources very efficiently, which results in maximizing performance.

    What is multiprocessing?

    When multiple processes run independently, with separate resources associated with each process, it is called multiprocessing. Multiprocessing is achieved with multiple processors running separate processes on each processor. 

    Let’s get back to our program. Here you can see we have a connection and read method. These two methods are exactly the same as from the previous section. Here, we have one new function, which is get_thread(). Be careful, as a method belongs to the class, and afunction, it is not part of this class. So, this get_thred() function is global and acts as a wrapper function for calling the read method from the class PostgresDbClient. This is because we can’t create threads using class methods. Don’t get confused if you don’t understand it, as it is just how we write the code.

    To run the program, we have specified the Postgres database details and queries. In the previous approach, we fetched all the data from the table with one thread only. In this approach, the plan is to fetch one day of data using one thread so that we can maximize resource utilization. Here, each query reads one day’s worth of data from the table using one thread. Having 5 queries will fetch 5 days of data, and 5 threads will be running concurrently.

    To create a thread in Python, we will need to use the Thread() method from the threading library. We need to pass the function that we want to run and arguments of that function. The thread() object will create a new thread and return its object. The thread has been created but has not yet started. To start this thread, we will need to use the start() method. In our program, we are starting 5 threads. If you try executing this entire program multiple times, you will end up with different results. Some data will fetch prior, and some will fetch later. And at the time of the next execution, this order will be different again. This is because resource handling is done by the operating system. Depending on what the OS thinks about which thread to give what resources, the output is generated. If you want to know how this is done, you will need to go deep into operating systems concepts.

    In our use case, we are just printing the data to the console. To store the data, there are multiple ways. One simple way is to define the global variable and store the result in it, but we will need to achieve synchronization as multiple threads might access the global variable, which can lead to race conditions. Another way is to extend the thread class to your custom class, and you can define a class variable—and you can use this variable to save the data. Again, here, you will need to make sure you are achieving synchronization.

    So, whenever you want to store the data in a variable by any available method, you will need to achieve synchronization. So, synchronization will lead to the sequential execution of threads. And this sequential processing is not what we are looking for.
    To avoid synchronization, we can directly write the data to the target—so that when the thread reads the data, the same thread will write data again back to the target database. This way, we can avoid synchronization and store the data in the database for future use. This function can look as below, where db_client.write(data) is a function that writes the data to a database.

    def get_thread(thread_id, db_client, query):

        print(f”Starting thread id {thread_id}”)

        data = pd.DataFrame(db_client.read(query))

        print(f”Thread {thread_id} data “, data, sep=”n”)

        db_client.write(data)

        print(f”Stopping thread id {thread_id}”)

    Python Program:

    import threading
    import traceback
    import psycopg2
    import pandas as pd
    
    class PostgresDbClient:
        def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password):
            self.db_host = postgres_hostname
            self.db_port = postgres_jdbcport
            self.db_name = postgres_dbname
            self.db_user = username
            self.db_pass = password
    
        def create_conn(self):
            conn = None
            try:
                print('Connecting to the Postgres database...')
                conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port))
                print('Successfully connected to the Postgres database...')
            except Exception as e:
                print("Cannot connect to Postgres.")
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
            return conn
    
        def read(self, query):
            try:
                conn = self.create_conn()
                cursor = conn.cursor()
                print(f"Reading data !!!")
                cursor.execute(query)
                data = cursor.fetchall()
                print(f"Read Data !!!")
                cursor.close()
                conn.close()
                return data
            except Exception as e:
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
    
    
    def get_thread(thread_id, db_client, query):
        print(f"Starting thread id {thread_id}")
        data = pd.DataFrame(db_client.read(query))
        print(f"Thread {thread_id} data ", data, sep="n")
        print(f"Stopping thread id {thread_id}")
    
    
    if __name__ == "__main__":
        hostname = "localhost"
        jdbc_port = 5432
        dbname = "postgres"
        username = "postgres"
        password = "pass@123"
        table_name = "covid_data"
        query = f"select * from {table_name}"
    
        partition_column = 'date'
        lower_bound = '2020-04-26'
        upper_bound = '2020-04-30'
        num_partitions = 5
    
    
        query1 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query2 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query3 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query4 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query5 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
    
        db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password)
        x1 = threading.Thread(target=get_thread, args=(1, db_client, query1))
        x1.start()
        x2 = threading.Thread(target=get_thread, args=(2, db_client, query2))
        x2.start()
        x3 = threading.Thread(target=get_thread, args=(3, db_client, query3))
        x3.start()
        x4 = threading.Thread(target=get_thread, args=(4, db_client, query4))
        x4.start()
        x5 = threading.Thread(target=get_thread, args=(5, db_client, query5))
        x5.start()

    Output of Program:

    Starting thread id 1
    Connecting to the Postgres database...
    Starting thread id 2
    Connecting to the Postgres database...
    Starting thread id 3
    Connecting to the Postgres database...
    Starting thread id 4
    Connecting to the Postgres database...
    Starting thread id 5
    Connecting to the Postgres database...
    Successfully connected to the Postgres database...Successfully connected to the Postgres database...Successfully connected to the Postgres database...
    Reading data !!!
    
    Reading data !!!
    
    Reading data !!!
    Successfully connected to the Postgres database...
    Reading data !!!
    Successfully connected to the Postgres database...
    Reading data !!!
    Read Data !!!
    Read Data !!!
    Read Data !!!
    Read Data !!!
    Read Data !!!
    Thread 2 data 
    Thread 3 data 
    Thread 1 data 
    Thread 5 data 
    Thread 4 data 
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-27
    1                Andhra Pradesh       Anantapur   53  ...  0  None  2020-04-27
    2                Andhra Pradesh        Chittoor   73  ...  0  None  2020-04-27
    3                Andhra Pradesh   East Godavari   39  ...  0  None  2020-04-27
    4                Andhra Pradesh          Guntur  237  ...  0  None  2020-04-27
    5                Andhra Pradesh         Krishna  210  ...  0  None  2020-04-27
    6                Andhra Pradesh         Kurnool  292  ...  0  None  2020-04-27
    7                Andhra Pradesh        Prakasam   56  ...  0  None  2020-04-27
    8                Andhra Pradesh  S.P.S. Nellore   79  ...  0  None  2020-04-27
    9                Andhra Pradesh      Srikakulam    4  ...  0  None  2020-04-27
    10               Andhra Pradesh   Visakhapatnam   22  ...  0  None  2020-04-27
    11               Andhra Pradesh   West Godavari   54  ...  0  None  2020-04-27
    12               Andhra Pradesh   Y.S.R. Kadapa   58  ...  0  None  2020-04-27
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-27
    14                        Assam         Unknown   36  ...  0  None  2020-04-27
    15                        Bihar           Arwal    4  ...  0  None  2020-04-27
    16                        Bihar      Aurangabad    7  ...  0  None  2020-04-27
    17                        Bihar           Banka    2  ...  0  None  2020-04-27
    18                        Bihar       Begusarai    9  ...  0  None  2020-04-27
    19                        Bihar       Bhagalpur    5  ...  0  None  2020-04-27
    
    [20 rows x 8 columns]
    Stopping thread id 2
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-26
    1                Andhra Pradesh       Anantapur   53  ...  0  None  2020-04-26
    2                Andhra Pradesh        Chittoor   73  ...  0  None  2020-04-26
    3                Andhra Pradesh   East Godavari   39  ...  0  None  2020-04-26
    4                Andhra Pradesh          Guntur  214  ...  0  None  2020-04-26
    5                Andhra Pradesh         Krishna  177  ...  0  None  2020-04-26
    6                Andhra Pradesh         Kurnool  279  ...  0  None  2020-04-26
    7                Andhra Pradesh        Prakasam   56  ...  0  None  2020-04-26
    8                Andhra Pradesh  S.P.S. Nellore   72  ...  0  None  2020-04-26
    9                Andhra Pradesh      Srikakulam    3  ...  0  None  2020-04-26
    10               Andhra Pradesh   Visakhapatnam   22  ...  0  None  2020-04-26
    11               Andhra Pradesh   West Godavari   51  ...  0  None  2020-04-26
    12               Andhra Pradesh   Y.S.R. Kadapa   58  ...  0  None  2020-04-26
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-26
    14                        Assam         Unknown   36  ...  0  None  2020-04-26
    15                        Bihar           Arwal    4  ...  0  None  2020-04-26
    16                        Bihar      Aurangabad    2  ...  0  None  2020-04-26
    17                        Bihar           Banka    2  ...  0  None  2020-04-26
    18                        Bihar       Begusarai    9  ...  0  None  2020-04-26
    19                        Bihar       Bhagalpur    5  ...  0  None  2020-04-26
    
    [20 rows x 8 columns]
    Stopping thread id 1
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-28
    1                Andhra Pradesh       Anantapur   54  ...  0  None  2020-04-28
    2                Andhra Pradesh        Chittoor   74  ...  0  None  2020-04-28
    3                Andhra Pradesh   East Godavari   39  ...  0  None  2020-04-28
    4                Andhra Pradesh          Guntur  254  ...  0  None  2020-04-28
    5                Andhra Pradesh         Krishna  223  ...  0  None  2020-04-28
    6                Andhra Pradesh         Kurnool  332  ...  0  None  2020-04-28
    7                Andhra Pradesh        Prakasam   56  ...  0  None  2020-04-28
    8                Andhra Pradesh  S.P.S. Nellore   82  ...  0  None  2020-04-28
    9                Andhra Pradesh      Srikakulam    4  ...  0  None  2020-04-28
    10               Andhra Pradesh   Visakhapatnam   22  ...  0  None  2020-04-28
    11               Andhra Pradesh   West Godavari   54  ...  0  None  2020-04-28
    12               Andhra Pradesh   Y.S.R. Kadapa   65  ...  0  None  2020-04-28
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-28
    14                        Assam         Unknown   38  ...  0  None  2020-04-28
    15                        Bihar          Araria    1  ...  0  None  2020-04-28
    16                        Bihar           Arwal    4  ...  0  None  2020-04-28
    17                        Bihar      Aurangabad    7  ...  0  None  2020-04-28
    18                        Bihar           Banka    3  ...  0  None  2020-04-28
    19                        Bihar       Begusarai    9  ...  0  None  2020-04-28
    
    [20 rows x 8 columns]
    Stopping thread id 3
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-30
    1                Andhra Pradesh       Anantapur   61  ...  0  None  2020-04-30
    2                Andhra Pradesh        Chittoor   80  ...  0  None  2020-04-30
    3                Andhra Pradesh   East Godavari   42  ...  0  None  2020-04-30
    4                Andhra Pradesh          Guntur  287  ...  0  None  2020-04-30
    5                Andhra Pradesh         Krishna  246  ...  0  None  2020-04-30
    6                Andhra Pradesh         Kurnool  386  ...  0  None  2020-04-30
    7                Andhra Pradesh        Prakasam   60  ...  0  None  2020-04-30
    8                Andhra Pradesh  S.P.S. Nellore   84  ...  0  None  2020-04-30
    9                Andhra Pradesh      Srikakulam    5  ...  0  None  2020-04-30
    10               Andhra Pradesh   Visakhapatnam   23  ...  0  None  2020-04-30
    11               Andhra Pradesh   West Godavari   56  ...  0  None  2020-04-30
    12               Andhra Pradesh   Y.S.R. Kadapa   73  ...  0  None  2020-04-30
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-30
    14                        Assam         Unknown   43  ...  0  None  2020-04-30
    15                        Bihar          Araria    1  ...  0  None  2020-04-30
    16                        Bihar           Arwal    4  ...  0  None  2020-04-30
    17                        Bihar      Aurangabad    8  ...  0  None  2020-04-30
    18                        Bihar           Banka    3  ...  0  None  2020-04-30
    19                        Bihar       Begusarai   11  ...  0  None  2020-04-30
    
    [20 rows x 8 columns]
    Stopping thread id 5
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-29
    1                Andhra Pradesh       Anantapur   58  ...  0  None  2020-04-29
    2                Andhra Pradesh        Chittoor   77  ...  0  None  2020-04-29
    3                Andhra Pradesh   East Godavari   40  ...  0  None  2020-04-29
    4                Andhra Pradesh          Guntur  283  ...  0  None  2020-04-29
    5                Andhra Pradesh         Krishna  236  ...  0  None  2020-04-29
    6                Andhra Pradesh         Kurnool  343  ...  0  None  2020-04-29
    7                Andhra Pradesh        Prakasam   60  ...  0  None  2020-04-29
    8                Andhra Pradesh  S.P.S. Nellore   82  ...  0  None  2020-04-29
    9                Andhra Pradesh      Srikakulam    5  ...  0  None  2020-04-29
    10               Andhra Pradesh   Visakhapatnam   23  ...  0  None  2020-04-29
    11               Andhra Pradesh   West Godavari   56  ...  0  None  2020-04-29
    12               Andhra Pradesh   Y.S.R. Kadapa   69  ...  0  None  2020-04-29
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-29
    14                        Assam         Unknown   38  ...  0  None  2020-04-29
    15                        Bihar          Araria    1  ...  0  None  2020-04-29
    16                        Bihar           Arwal    4  ...  0  None  2020-04-29
    17                        Bihar      Aurangabad    8  ...  0  None  2020-04-29
    18                        Bihar           Banka    3  ...  0  None  2020-04-29
    19                        Bihar       Begusarai   11  ...  0  None  2020-04-29
    
    [20 rows x 8 columns]
    Stopping thread id 4
    
    Process finished with exit code 0

    Note that in this blog, we have used a password as a hardcoded string, which is definitely not the way to define passwords. We should use secrets, .env files, etc., as input for passwords. We do not hardcode passwords in the production environment.

    Conclusion 

    After going through the above blog, you might have gotten more familiar with how to perform read and write operations on databases using spark, python, and multithreading concepts. You also know now what are multi processes and what multithreading is. You are now also able to analyze the best way to carry out read-and-write operations on a database based on your requirements. 

    In general, if you have a small amount of data, you can use a simple python approach to read and write data. If you have a relatively high amount of data, then you can use a multi-threaded approach or a single-partition Spark approach. If you have a huge amount of data, and where reading millions of records per second is a requirement, then you can use the Spark multi-partition approach. In the end, it’s just mostly personal preference, and using which approach depends on your requirements and availability of resources.

  • A Beginner’s Guide to Python Tornado

    The web is a big place now. We need to support thousands of clients at a time, and here comes Tornado. Tornado is a Python web framework and asynchronous network library, originally developed at FriendFreed.

    Tornado uses non-blocking network-io. Due to this, it can handle thousands of active server connections. It is a saviour for applications where long polling and a large number of active connections are maintained.

    Tornado is not like most Python frameworks. It’s not based on WSGI, while it supports some features of WSGI using module `tornado.wsgi`. It uses an event loop design that makes Tornado request execution faster.  

    What is Synchronous Program?

    A function blocks, performs its computation, and returns, once done . A function may block for many reasons: network I/O, disk I/O, mutexes, etc.

    Application performance depends on how efficiently application uses CPU cycles, that’s why blocking statements/calls must be taken seriously. Consider password hashing functions like bcrypt, which by design use hundreds of milliseconds of CPU time, far more than a typical network or disk access. As the CPU is not idle, there is no need to go for asynchronous functions.

    A function can be blocking in one, and non-blocking in others. In the context of Tornado, we generally consider blocking due to network I/O and disk, although all kinds of blocking need to be minimized.

    What is Asynchronous Program?

    1) Single-threaded architecture:

        Means, it can’t do computation-centric tasks parallely.

    2) I/O concurrency:

        It can handover IO tasks to the operating system and continue to the next task to achieve parallelism.

    3) epoll/ kqueue:

        Underline system-related construct that allows an application to get events on a file descriptor or I/O specific tasks.

    4) Event loop:

        It uses epoll or kqueue to check if any event has happened, and executes callback that is waiting for those network events.

    Asynchronous vs Synchronous Web Framework:

    In case of synchronous model, each request or task is transferred to thread or routing, and as it finishes, the result is handed over to the caller. Here, managing things are easy, but creating new threads is too much overhead.

    On the other hand, in Asynchronous framework, like Node.js, there is a single-threaded model, so very less overhead, but it has complexity.

    Let’s imagine thousands of requests coming through and a server uses event loop and callback. Now, until request gets processed, it has to efficiently store and manage the state of that request to map callback result to the actual client.

    Node.js vs Tornado

    Most of these comparison points are tied to actual programming language and not the framework: 

    • Node.js has one big advantage that all of its libraries are Async. In Python, there are lots of available packages, but very few of them are asynchronous
    • As Node.js is JavaScript runtime, and we can use JS for both front and back-end, developers can keep only one codebase and share the same utility library
    • Google’s V8 engine makes Node.js faster than Tornado. But a lot of Python libraries are written in C and can be faster alternatives.

    A Simple ‘Hello World’ Example

    import tornado.ioloop
    import tornado.web
    
    class MainHandler(tornado.web.RequestHandler):
        def get(self):
            self.write("Hello, world")
    
    def make_app():
        return tornado.web.Application([
            (r"/", MainHandler),
        ])
    
    if __name__ == "__main__":
        app = make_app()
        app.listen(8888)
        tornado.ioloop.IOLoop.current().start()

    Note: This example does not use any asynchronous feature.

    Using AsyncHTTPClient module, we can do REST call asynchronously.

    from tornado.httpclient import AsyncHTTPClient
    from tornado import gen
    
    @gen.coroutine
    def async_fetch_gen(url):
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch(url)
        raise gen.Return(response.body)

    As you can see `yield http_client.fetch(url)` will run as a coroutine.

    Complex Example of Tornado Async

    Please have a look at Asynchronous Request handler.

    WebSockets Using Tornado:

    Tornado has built-in package for WebSockets that can be easily used with coroutines to achieve concurrency, here is one example:

    import logging
    import tornado.escape
    import tornado.ioloop
    import tornado.options
    import tornado.web
    import tornado.websocket
    from tornado.options import define, options
    from tornado.httpserver import HTTPServer
    
    define("port", default=8888, help="run on the given port", type=int)
    
    
    # queue_size = 1
    # producer_num_items = 5
    # q = queues.Queue(queue_size)
    
    def isPrime(num):
        """
        Simple worker but mostly IO/network call
        """
        if num > 1:
            for i in range(2, num // 2):
                if (num % i) == 0:
                    return ("is not a prime number")
            else:
                return("is a prime number")
        else:
            return ("is not a prime number")
    
    class Application(tornado.web.Application):
        def __init__(self):
            handlers = [(r"/chatsocket", TornadoWebSocket)]
            super(Application, self).__init__(handlers)
    
    class TornadoWebSocket(tornado.websocket.WebSocketHandler):
        clients = set()
    
        # enable cross domain origin
        def check_origin(self, origin):
            return True
    
        def open(self):
            TornadoWebSocket.clients.add(self)
    
        # when client closes connection
        def on_close(self):
            TornadoWebSocket.clients.remove(self)
    
        @classmethod
        def send_updates(cls, producer, result):
    
            for client in cls.clients:
    
                # check if result is mapped to correct sender
                if client == producer:
                    try:
                        client.write_message(result)
                    except:
                        logging.error("Error sending message", exc_info=True)
    
        def on_message(self, message):
            try:
                num = int(message)
            except ValueError:
                TornadoWebSocket.send_updates(self, "Invalid input")
                return
            TornadoWebSocket.send_updates(self, isPrime(num))
    
    def start_websockets():
        tornado.options.parse_command_line()
        app = Application()
        server = HTTPServer(app)
        server.listen(options.port)
        tornado.ioloop.IOLoop.current().start()
    
    
    
    if __name__ == "__main__":
        start_websockets()

    One can use a WebSocket client application to connect to the server, message can be any integer. After processing, the client receives the result if the integer is prime or not.  
    Here is one more example of actual async features of Tornado. Many will find it similar to Golang’s Goroutine and channels.

    In this example, we can start worker(s) and they will listen to the ‘tornado.queue‘. This queue is asynchronous and very similar to the asyncio package.

    # Example 1
    from tornado import gen, queues
    from tornado.ioloop import IOLoop
    
    @gen.coroutine
    def consumer(queue, num_expected):
        for _ in range(num_expected):
            # heavy I/O or network task
            print('got: %s' % (yield queue.get()))
    
    
    @gen.coroutine
    def producer(queue, num_items):
        for i in range(num_items):
            print('putting %s' % i)
            yield queue.put(i)
    
    @gen.coroutine
    def main():
        """
        Starts producer and consumer and wait till they finish
        """
        yield [producer(q, producer_num_items), consumer(q, producer_num_items)]
    
    queue_size = 1
    producer_num_items = 5
    q = queues.Queue(queue_size)
    
    results = IOLoop.current().run_sync(main)
    
    
    # Output:
    # putting 0
    # putting 1
    # got: 0
    # got: 1
    # putting 2
    # putting 3
    # putting 4
    # got: 2
    # got: 3
    # got: 4
    
    
    # Example 2
    # Condition
    # A condition allows one or more coroutines to wait until notified.
    from tornado import gen
    from tornado.ioloop import IOLoop
    from tornado.locks import Condition
    
    my_condition = Condition()
    
    @gen.coroutine
    def waiter():
        print("I'll wait right here")
        yield my_condition.wait()
        print("Received notification now doing my things")
    
    @gen.coroutine
    def notifier():
        yield gen.sleep(60)
        print("About to notify")
        my_condition.notify()
        print("Done notifying")
    
    @gen.coroutine
    def runner():
        # Wait for waiter() and notifier() in parallel
        yield([waiter(), notifier()])
    
    results = IOLoop.current().run_sync(runner)
    
    
    # output:
    
    # I'll wait right here
    # About to notify
    # Done notifying
    # Received notification now doing my things

    Conclusion

    1) Asynchronous frameworks are not much of use when most of the computations are CPU centric and not I/O.

    2) Due to a single thread per core model and event loop, it can manage thousands of active client connections.

    3) Many say Django is too big, Flask is too small, and Tornado is just right:)