ClickHouse is an open-source column-oriented data warehouse for online analytical processing of queries (OLAP). It is fast, scalable, flexible, cost-efficient, and easy to run. It supports the best in the industry query performance while significantly reducing storage requirements through innovative use of columnar storage and compression.
ClickHouse’s performance exceeds comparable column-oriented database management systems that are available on the market. ClickHouse is a database management system, not a single database. ClickHouse allows creating tables and databases at runtime, loading data, and running queries without reconfiguring and restarting the server.
ClickHouse processes from hundreds of millions to over a billion rows of data across hundreds of node clusters. It utilizes all available hardware for processing queries to their fastest. The peak processing performance for a single query stands at more than two terabytes per second.
What makes ClickHouse unique?
Data Storage & Compression: ClickHouse is designed to work on regular hard drives but uses SSD and additional RAM if available. Data compression in ClickHouse plays a crucial role in achieving excellent performance. It provides general-purpose compression codecs and some specialized codecs for specific kinds of data. These codecs have different CPU consumption and disk space and help ClickHouse outperform other databases.
High Performance: By using vector computation, engine data is processed by vectors which are parts of columns, and achieve high CPU efficiency. It supports parallel processing across multiple cores, turning large queries into parallelized naturally. ClickHouse also supports distributed query processing; data resides across shards which are used for parallel execution of the query.
Primary & Secondary Index: Data is sorted physically by the primary key allowing low latency extraction of specific values or ranges. The secondary index in ClickHouse enable the database to know that the query filtering conditions would skip some of the parts entirely. Therefore, these are also called data skipping indexes.
Support for Approximated Calculations: ClickHouse trades accuracy for performance by approximated calculations. It provides aggregate functions for an approximated estimate of several distinct values, medians, and quantiles. It retrieves proportionally fewer data from the disk to run queries based on the part of data to get approximated results.
Data Replication and Data Integrity Support: All the remaining duplicates retrieve their copies in the background after being written to any available replica. The system keeps identical data on several clones. Most failures are recovered automatically or semi-automatically in complex scenarios.
But it can’t be all good, can it? there are some disadvantages to ClickHouse as well:
No full-fledged transactions.
Inability to efficiently and precisely change or remove previously input data. For example, to comply with GDPR, data could well be cleaned up or modified using batch deletes and updates.
ClickHouse is less efficient for point queries that retrieve individual rows by their keys due to the sparse index.
ClickHouse against its contemporaries
So with all these distinctive features, how does ClickHouse compare with other industry-leading data storage tools. Now, ClickHouse being general-purpose, has a variety of use cases, and it has its pros and cons, so here’s a high-level comparison against the best tools in their domain. Depending on the use case, each tool has its unique traits, and comparison around them would not be fair, but what we care about the most is performance, scalability, cost, and other key attributes that can be compared irrespective of the domain. So here we go:
ClickHouse vs Snowflake:
With its decoupled storage & compute approach, Snowflake is able to segregate workloads and enhance performance. The search optimization service in Snowflake further enhances the performance for point lookups but has additional costs attached with it. ClickHouse, on the other hand, with local runtime and inherent support for multiple forms of indexing, drastically improves query performance.
Regarding scalability, ClickHouse being on-prem makes it slightly challenging to scale compared to Snowflake, which is cloud-based. Managing hardware manually by provisioning clusters and migrating is doable but tedious. But one possible solution to tackle is to deploy CH on the cloud, a very good option that is cheaper and, frankly, the most viable.
ClickHouse vs Redshift:
Redshift is a managed, scalable cloud data warehouse. It offers both provisioned and serverless options. Its RA3 nodes compute scalably and cache the necessary data. Still, even with that, its performance does not separate different workloads that are on the same data putting it on the lower end of the decoupled compute & storage cloud architectures. ClickHouse’s local runtime is one of the fastest.
Both Redshift and ClickHouse are columnar, sort data, allowing read-only specific data. But deploying CH is cheaper, and although RS is tailored to be a ready-to-use tool, CH is better if you’re not entirely dependent on Redshift’s features like configuration, backup & monitoring.
ClickHouse vs InfluxDB:
InfluxDB, written in Go, this open-source no-SQL is one of the most popular choices when it comes to dealing with time-series data and analysis. Despite being a general-purpose analytical DB, ClickHouse provides competitive write performance.
ClickHouse’s data structures like AggregatingMergeTree allow real-time data to be stored in a pre-aggregated format which puts it on par in performance regarding TSDBs. It is significantly faster in heavy queries and comparable in the case of light queries.
ClickHouse vs PostgreSQL:
Postgres is another DB that is very versatile and thus is widely used by the world for various use cases, just like ClickHouse. Postgres, however, is an OLTP DB, so unlike ClickHouse, analytics is not its primary aim, but it’s still used for analytics purposes to a certain extent.
In terms of transactional data, ClickHouse’s columnar nature puts it below Postgres, but when it comes to analytical capabilities, even after tuning Postgres to its max potential, for, e.g., by using materialized views, indexing, cache size, buffers, etc. ClickHouse is ahead.
ClickHouse vs Apache Druid:
Apache Druid is an open-source data store that is primarily used for OLAP. Both Druid & ClickHouse are very similar in terms of their approaches and use cases but differ in terms of their architecture. Druid is mainly used for real-time analytics with heavy ingestions and high uptime.
Unlike Druid, ClickHouse has a much simpler deployment. CH can be deployed on only one server, while Druid setup needs multiple types of nodes (master, broker, ingestion, etc.). ClickHouse, with its support for SQL-like nature, provides better flexibility. It is more performant when the deployment is small.
To summarize the differences between ClickHouse and other data warehouses:
ClickHouse Engines
Depending on the type of your table (internal or external) ClickHouse provides an array of engines that help us connect to different data storages and also determine the way data is stored, accessed, and other interactions on it.
These engines are mainly categorized into two types:
Database Engines:
These allow us to work with different databases & tables. ClickHouse uses the Atomic database engine to provide configurable table engines and dialects. The popular ones are PostgreSQL, MySQL, and so on.
Table Engines:
These determine
how and where data is stored
where to read/write it from/to
which queries it supports
use of indexes
concurrent data access and so on.
These engines are further classified into families based on the above parameters:
MergeTree Engines:
This is the most universal and functional table for high-load tasks. The engines of this family support quick data insertion with subsequent background data processing. These engines also support data replication, partitioning, secondary data-skipping indexes and some other features. Following are some of the popular engines in this family:
MergeTree
SummingMergeTree
AggregatingMergeTree
MergeTree engines with indexing and partitioning support allow data to be processed at a tremendous speed. These can also be leveraged to form materialized views that store aggregated data further improving the performance.
Log Engines:
These are lightweight engines with minimum functionality. These work the best when the requirement is to quickly write into many small tables and read them later as a whole. This family consists of:
Log
StripeLog
TinyLog
These engines append data to the disk in a sequential fashion and support concurrent reading. They do not support indexing, updating, or deleting and hence are only useful when the data is small, sequential, and immutable.
Integration Engines:
These are used for communicating with other data storage and processing systems. This support:
JDBC
MongoDB
HDFS
S3
Kafka and so on.
Using these engines we can import and export data from external sources. With engines like Kafka we can ingest data directly from a topic to a table in ClickHouse and with the S3 engine, we work directly with S3 objects.
Special Engines:
ClickHouse offers some special engines that are specific to the use case. For example:
MaterializedView
Distributed
Merge
File and so on.
These special engines have their own quirks for eg. with File we can export data to a file, update data in the table by updating the file, etc.
Summary
We learned that ClickHouse is a very powerful and versatile tool. One that has stellar performance is feature-packed, very cost-efficient, and open-source. We saw a high-level comparison of ClickHouse with some of the best choices in an array of use cases. Although it ultimately comes down to how specific and intense your use case is, ClickHouse and its generic nature measure up pretty well on multiple occasions.
ClickHouse’s applicability in web analytics, network management, log analysis, time series analysis, asset valuation in financial markets, and security threat identification makes it tremendously versatile. With consistently solving business problems in a low latency response for petabytes of data, ClickHouse is indeed one of the faster data warehouses out there.
The amount of data in our world has been exploding exponentially day by day. Processing and analyzing this Big Data has become key in the current age to make informed, data-driven decisions. Spark is a unified distributed data processing engine used for Big Data. Spark can be used to process Big Data in an efficient manner. Spark lets you process Big Data faster by splitting the work into chunks and assigning those chunks to computation resources across nodes. It can handle up to petabytes of data, which is millions of gigabytes of data. It processes all its data in memory, which makes it faster.
We talked about processing Big Data in Spark, but we know spark doesn’t store any data like other file systems. So, to process data in Spark, we must read data from different data sources, clean or process the data, and again store this data in one of the target data sources. Data sources can be files, APIs, databases, or streams.
Database management systems have been present for a decade. Many applications generate huge amounts of data and store data in database management systems. And a lot of times, we need to connect spark to the database and process that data.
In this blog, we are going to discuss how to use spark to read from and write to databases in parallel. Our focus will be on reading/writing data from/to the database using different methods, which will help us read/write TeraBytes of data in an efficient manner.
Reading / Writing data from/to Database using Spark:
To read data or write data from/to the database, we will need to perform a few basic steps regardless of any programming language or framework we are using. What follows is an overview of the steps to read data from databases.
Step 1: Register Driver or Use Connector
Get the respective driver of your database and register the driver, or use the connector to connect to the database.
Step 2: Make a connection
Next, the driver or connector makes a connection to the database.
Step 3: Run query statement
Using the connection created in the previous step, execute the query, which will return the result.
Step 4: Process result
For the result, we got in the previous step, process it as per your requirement.
This dataset contains details of COVID patients across all states. It has different information such as State, Confirmed, Recovered, Deceased, Other, Tested, and Date.
You can load this dataset in any of the databases you work with and can try out the entire discussion practically.
The following image shows ten records of the entire dataset.
Spark provides an API to read data from a database and is very simple to use. First of all, we will need to create a Spark session. Then add the driver to Spark. It can be added through the program itself, or we can add it using shell also.
The first line of code imports the SparkSession class. This is the entry point to programming Spark with the Dataset and DataFrame API
From the fifth to the ninth line of the above code, we are creating a spark session on a local system with four cores, which will be used for interaction with our spark application. We specify the name for our application using appName(), which in our case, is ‘Databases.’ This app name will be shown on Webb UI for our cluster. Next, we can specify any configurations for the spark application using config(). In our case, we have specified the configuration of the driver for the Postgres database, which will be used to create a connection with the Postgres database. You can specify the driver of any of the available databases.
To connect to the database, we must have a hostname, port, database name, username, and password with us. Those details are in 10 through 16 lines of the above code.
Refer to the code lines from 19 to 28 in the above snippet. Up until now, we have had our Spark session and all the information that we need to connect to the database. Using the Spark Read API, we read the data from the database. This will create a connection to the Postgres database from one of the cores that we have allocated for the Spark application. And using this connection, it will read the data into the table_data_df dataframe. Even if we have multiple cores for our application, it will still create only one connection from one of the cores. The rest of the cores will not be utilized. While we will discuss how to utilize all cores, our first focus is here.
Refer to the code lines from 29 to 38 in the above snippet. We have the data now, so let’s try to write it to the database. Using the Spark Write API, we will write data to the database. This will also create only one connection to the database from one of the cores that we have allocated for the Spark application. Even if we have more cores for the application, it still uses only one core with the above code.
Output of Program:
/usr/bin/python3.8/home/aniketrajput/aniket_work/Spark/main.pyWARNING: An illegal reflective access operation has occurredWARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.PlatformWARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operationsWARNING: All illegal access operations will be denied in a future release:: loading settings :: url =jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xmlIvy Default Cache set to: /home/aniketrajput/.ivy2/cacheThe jars for the packages stored in: /home/aniketrajput/.ivy2/jarsorg.postgresql#postgresql added asadependency:: resolving dependencies :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8;1.0confs: [default] found org.postgresql#postgresql;42.2.8in central:: resolution report :: resolve 113ms :: artifacts dl 3ms :: modules inuse: org.postgresql#postgresql;42.2.8 from central in [default]---------------------------------------------------------------------|| modules || artifacts || conf | number| search|dwnlded|evicted|| number|dwnlded|---------------------------------------------------------------------|default|1|0|0|0||1|0|---------------------------------------------------------------------:: retrieving :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8confs: [default]0 artifacts copied, 1 already retrieved (0kB/5ms)22/04/2211:55:33WARNNativeCodeLoader: Unable to load native-hadoop library for your platform...usingbuiltin-java classes where applicableUsing Spark's default log4j profile: org/apache/spark/log4j-defaults.propertiesSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).+-------------+-----------------+---------+---------+--------+-----+------+----------+| state| district|confirmed|recovered|deceased|other|tested| date|+-------------+-----------------+---------+---------+--------+-----+------+----------+|Uttar Pradesh| Varanasi|23512|23010|456|0|595510|2021-02-24|| Uttarakhand| Almora|3259|3081|25|127|84443|2021-02-24|| Uttarakhand| Bageshwar|1534|1488|17|26|55626|2021-02-24|| Uttarakhand| Chamoli|3486|3373|15|88|90390|2021-02-24|| Uttarakhand| Champawat|1819|1790|9|7|95068|2021-02-24|| Uttarakhand| Dehradun|29619|28152|962|439|401496|2021-02-24|| Uttarakhand| Haridwar|14137|13697|158|175|369542|2021-02-24|| Uttarakhand| Nainital|12636|12254|237|79|204422|2021-02-24|| Uttarakhand| Pauri Garhwal|5145|5033|60|24|138878|2021-02-24|| Uttarakhand| Pithoragarh|3361|3291|47|11|72686|2021-02-24|| Uttarakhand| Rudraprayag|2270|2251|10|7|52378|2021-02-24|| Uttarakhand| Tehri Garhwal|4227|4026|16|170|105111|2021-02-24|| Uttarakhand|Udham Singh Nagar|11538|11267|117|123|337292|2021-02-24|| Uttarakhand| Uttarkashi|3789|3645|17|118|120026|2021-02-24|| West Bengal| Alipurduar|7705|7616|86|0|null|2021-02-24|| West Bengal| Bankura|11940|11788|92|0|null|2021-02-24|| West Bengal| Birbhum|10035|9876|89|0|null|2021-02-24|| West Bengal| Cooch Behar|11835|11756|72|0|null|2021-02-24|| West Bengal| Dakshin Dinajpur|8179|8099|74|0|null|2021-02-24|| West Bengal| Darjeeling|18423|18155|203|0|null|2021-02-24|+-------------+-----------------+---------+---------+--------+-----+------+----------+only showing top 20 rowsProcess finished with exit code 0
As promised in the last section, we will discuss how we can optimize for resource utilization. In the last section, we had only one connection, utilizing very limited resources and causing resources to be idle or unused. To get over this, the Spark Read and Write API has a way by providing a few extra attributes. And those are partitionColumn, lowerBound, upperBound. These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. For each partition, there will be an individual core with its own connection performing the reads or writes. Thus, making the database operation in parallel.
This is an efficient way of reading and writing data from databases in spark rather than just doing it with one partition.
Partitions are decided by the Spark API in the following way.
Let’s consider an example where:
lowerBound: 0
upperBound: 1000
numPartitions: 10
Stride is equal to 100, and partitions correspond to the following queries:
SELECT * FROM table WHERE partitionColumn BETWEEN 0 AND 100
SELECT * FROM table WHERE partitionColumn BETWEEN 100 AND 200
…
…
SELECT * FROM table WHERE partitionColumn > 9000
BETWEEN here is exclusive on the upper bound.
Now we have data in multiple partitions. Each executor can have one or more partitions based on cluster configuration. Suppose we have 10 cores and 10 partitions. One partition of data can be fetched from one executor using one core. So, 10 partitions of data can be fetched from 10 executors. Each of these executors will create the connection to the database and will read the data.
Note– lowerbound and upperbound does not filter the data. It just helps spark to decide the stride of data.
partitionColumn must be a numeric, date, or timestamp column from the table
Also, there are some attributes that can be used during the write operation to optimize the write operation. One of the attributes is “batchsize”. The JDBC batch size, which determines how many rows to insert per round trip. This can help the performance of JDBC drivers. This option applies only to writing. One more attribute called “truncate” can be helpful to optimize the write operation. This is a JDBC writer-related option. When SaveMode.Overwrite is enabled, it causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient and prevents the table metadata (e.g., indices) from being removed.
Output of Program:
/usr/bin/python3.8/home/aniketrajput/aniket_work/Spark/main.pyWARNING: An illegal reflective access operation has occurredWARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.PlatformWARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operationsWARNING: All illegal access operations will be denied in a future release:: loading settings :: url =jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xmlIvy Default Cache set to: /home/aniketrajput/.ivy2/cacheThe jars for the packages stored in: /home/aniketrajput/.ivy2/jarsorg.postgresql#postgresql added asadependency:: resolving dependencies :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404;1.0confs: [default] found org.postgresql#postgresql;42.2.8in central:: resolution report :: resolve 104ms :: artifacts dl 3ms :: modules inuse: org.postgresql#postgresql;42.2.8 from central in [default]---------------------------------------------------------------------|| modules || artifacts || conf | number| search|dwnlded|evicted|| number|dwnlded|---------------------------------------------------------------------|default|1|0|0|0||1|0|---------------------------------------------------------------------:: retrieving :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404confs: [default]0 artifacts copied, 1 already retrieved (0kB/4ms)22/04/2212:20:32WARNNativeCodeLoader: Unable to load native-hadoop library for your platform...usingbuiltin-java classes where applicableUsing Spark's default log4j profile: org/apache/spark/log4j-defaults.propertiesSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).+-------------+-----------------+---------+---------+--------+-----+------+----------+| state| district|confirmed|recovered|deceased|other|tested| date|+-------------+-----------------+---------+---------+--------+-----+------+----------+|Uttar Pradesh| Varanasi|23512|23010|456|0|595510|2021-02-24|| Uttarakhand| Almora|3259|3081|25|127|84443|2021-02-24|| Uttarakhand| Bageshwar|1534|1488|17|26|55626|2021-02-24|| Uttarakhand| Chamoli|3486|3373|15|88|90390|2021-02-24|| Uttarakhand| Champawat|1819|1790|9|7|95068|2021-02-24|| Uttarakhand| Dehradun|29619|28152|962|439|401496|2021-02-24|| Uttarakhand| Haridwar|14137|13697|158|175|369542|2021-02-24|| Uttarakhand| Nainital|12636|12254|237|79|204422|2021-02-24|| Uttarakhand| Pauri Garhwal|5145|5033|60|24|138878|2021-02-24|| Uttarakhand| Pithoragarh|3361|3291|47|11|72686|2021-02-24|| Uttarakhand| Rudraprayag|2270|2251|10|7|52378|2021-02-24|| Uttarakhand| Tehri Garhwal|4227|4026|16|170|105111|2021-02-24|| Uttarakhand|Udham Singh Nagar|11538|11267|117|123|337292|2021-02-24|| Uttarakhand| Uttarkashi|3789|3645|17|118|120026|2021-02-24|| West Bengal| Alipurduar|7705|7616|86|0|null|2021-02-24|| West Bengal| Bankura|11940|11788|92|0|null|2021-02-24|| West Bengal| Birbhum|10035|9876|89|0|null|2021-02-24|| West Bengal| Cooch Behar|11835|11756|72|0|null|2021-02-24|| West Bengal| Dakshin Dinajpur|8179|8099|74|0|null|2021-02-24|| West Bengal| Darjeeling|18423|18155|203|0|null|2021-02-24|+-------------+-----------------+---------+---------+--------+-----+------+----------+only showing top 20 rowsProcess finished with exit code 0
We have seen how to read and write data in Spark. Spark is not the only way to connect with databases, right? There are multiple ways we can access databases and try to achieve parallel read-writes. We will discuss this in further sections. We will mainly focus on reading and writing it from python.
Single Thread Python Program:
import tracebackimport psycopg2import pandas as pdclass PostgresDbClient: def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password): self.db_host = postgres_hostname self.db_port = postgres_jdbcport self.db_name = postgres_dbname self.db_user = username self.db_pass = password def create_conn(self): conn = None try: print('Connecting to the Postgres database...') conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port)) print('Successfully connected to the Postgres database...') except Exception as e: print("Cannot connect to Postgres.") print(f'Error: {str(e)}nTrace: {traceback.format_exc()}') return conn def read(self, query): try: conn = self.create_conn() cursor = conn.cursor() print(f"Reading data !!!") cursor.execute(query) data = cursor.fetchall() print(f"Read Data !!!") cursor.close() conn.close() return data except Exception as e: print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')if __name__ == "__main__": hostname = "localhost" jdbc_port = 5432 dbname = "postgres" username = "postgres" password = "pass@123" table_name = "covid_data" query = f"select * from {table_name}" db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password) data = pd.DataFrame(db_client.read(query)) print(data)
To integrate Postgres with Python, we have different libraries or adopters that we can use. But Psycopg is the widely used adopter. First off all, you will need to install the Psycopg2 library. Psycopg2 is a slightly updated version of the Psycopg adapter. You install it using pip or any way you are comfortable with.
To connect with the Postgres database, we need hostname, port, database name, username, and password. We are storing all these details as attributes in class. The create connection method will form a connection with the Postgres database using the connect() method of psycopg2 module. This method will return a connection object. In the read method, we call this connection method and get a connection object. Using this connection object, we create a cursor. This cursor is bound to have a connection with the database for its lifetime and execute all the commands or queries on the database. Using this query object, we execute a read query on the database. Then the data returned by the executing read query can be fetched using the fetchall() method. Then we close the connection.
To run the program, we have specified details of database and query. Next, we create an object of PostgresDbClient and call the read method from class PostgresDbClient. This read method will return as data and we are converting this data into relational format using pandas.
This implementation is very straightforward: this program creates one process in our system and fetches all the data using system resources, CPU, memory, etc. The drawback of this approach is that suppose this program uses 30 percent CPU and memory resources out of 100%, then the remaining 70% of resources are idle. We can maximize this usage by other means like multithreading or multiprocessing.
Output of Program:
Connecting to the Postgres database...Successfully connected to the Postgres database...Reading data !!!Read Data !!!012345670 Andaman and Nicobar Islands Unknown 331100 None 2020-04-261 Andhra Pradesh Anantapur 531440 None 2020-04-262 Andhra Pradesh Chittoor 731300 None 2020-04-263 Andhra Pradesh East Godavari 391200 None 2020-04-264 Andhra Pradesh Guntur 2142980 None 2020-04-26.. ......... .. .. .. ......95 Bihar Araria 1000 None 2020-04-3096 Bihar Arwal 4000 None 2020-04-3097 Bihar Aurangabad 8000 None 2020-04-3098 Bihar Banka 3000 None 2020-04-3099 Bihar Begusarai 11500 None 2020-04-30[100 rows x 8 columns]Process finished with exit code 0
Multi Thread python program:
In the previous section, we discussed the drawback of a single process and single-thread implementation. Let’s get started with how to maximize resource usage. Before getting into multithreading, let’s understand a few basic but important concepts.
What is a process?
When you execute any program, the operating system loads it in memory and then starts executing the program. This instance of the program being executed is called a process. Computing and memory resources are associated with each process separately.
What is a thread?
A thread is a sequential flow of execution. A process is also a thread. Usually, the process is called a main thread. Unlike a process, the same computing and memory resources can be shared with multiple threads.
What is multithreading?
This is when a process has multiple threads, along with the main thread, and these threads run independently but concurrently using the same computing and memory resources associated with the process. Such a program is called a multithreaded program or process. Multithreading uses resources very efficiently, which results in maximizing performance.
What is multiprocessing?
When multiple processes run independently, with separate resources associated with each process, it is called multiprocessing. Multiprocessing is achieved with multiple processors running separate processes on each processor.
Let’s get back to our program. Here you can see we have a connection and read method. These two methods are exactly the same as from the previous section. Here, we have one new function, which is get_thread(). Be careful, as a method belongs to the class, and afunction, it is not part of this class. So, this get_thred() function is global and acts as a wrapper function for calling the read method from the class PostgresDbClient. This is because we can’t create threads using class methods. Don’t get confused if you don’t understand it, as it is just how we write the code.
To run the program, we have specified the Postgres database details and queries. In the previous approach, we fetched all the data from the table with one thread only. In this approach, the plan is to fetch one day of data using one thread so that we can maximize resource utilization. Here, each query reads one day’s worth of data from the table using one thread. Having 5 queries will fetch 5 days of data, and 5 threads will be running concurrently.
To create a thread in Python, we will need to use the Thread() method from the threading library. We need to pass the function that we want to run and arguments of that function. The thread() object will create a new thread and return its object. The thread has been created but has not yet started. To start this thread, we will need to use the start() method. In our program, we are starting 5 threads. If you try executing this entire program multiple times, you will end up with different results. Some data will fetch prior, and some will fetch later. And at the time of the next execution, this order will be different again. This is because resource handling is done by the operating system. Depending on what the OS thinks about which thread to give what resources, the output is generated. If you want to know how this is done, you will need to go deep into operating systems concepts.
In our use case, we are just printing the data to the console. To store the data, there are multiple ways. One simple way is to define the global variable and store the result in it, but we will need to achieve synchronization as multiple threads might access the global variable, which can lead to race conditions. Another way is to extend the thread class to your custom class, and you can define a class variable—and you can use this variable to save the data. Again, here, you will need to make sure you are achieving synchronization.
So, whenever you want to store the data in a variable by any available method, you will need to achieve synchronization. So, synchronization will lead to the sequential execution of threads. And this sequential processing is not what we are looking for. To avoid synchronization, we can directly write the data to the target—so that when the thread reads the data, the same thread will write data again back to the target database. This way, we can avoid synchronization and store the data in the database for future use. This function can look as below, where db_client.write(data) is a function that writes the data to a database.
def get_thread(thread_id, db_client, query):
print(f”Starting thread id {thread_id}”)
data = pd.DataFrame(db_client.read(query))
print(f”Thread {thread_id} data “, data, sep=”n”)
db_client.write(data)
print(f”Stopping thread id {thread_id}”)
Python Program:
import threadingimport tracebackimport psycopg2import pandas as pdclass PostgresDbClient: def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password): self.db_host = postgres_hostname self.db_port = postgres_jdbcport self.db_name = postgres_dbname self.db_user = username self.db_pass = password def create_conn(self): conn = None try: print('Connecting to the Postgres database...') conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port)) print('Successfully connected to the Postgres database...') except Exception as e: print("Cannot connect to Postgres.") print(f'Error: {str(e)}nTrace: {traceback.format_exc()}') return conn def read(self, query): try: conn = self.create_conn() cursor = conn.cursor() print(f"Reading data !!!") cursor.execute(query) data = cursor.fetchall() print(f"Read Data !!!") cursor.close() conn.close() return data except Exception as e: print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')def get_thread(thread_id, db_client, query): print(f"Starting thread id {thread_id}") data = pd.DataFrame(db_client.read(query)) print(f"Thread {thread_id} data ", data, sep="n") print(f"Stopping thread id {thread_id}")if __name__ == "__main__": hostname = "localhost" jdbc_port = 5432 dbname = "postgres" username = "postgres" password = "pass@123" table_name = "covid_data" query = f"select * from {table_name}" partition_column = 'date' lower_bound = '2020-04-26' upper_bound = '2020-04-30' num_partitions = 5 query1 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'" query2 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'" query3 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'" query4 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'" query5 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'" db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password) x1 = threading.Thread(target=get_thread, args=(1, db_client, query1)) x1.start() x2 = threading.Thread(target=get_thread, args=(2, db_client, query2)) x2.start() x3 = threading.Thread(target=get_thread, args=(3, db_client, query3)) x3.start() x4 = threading.Thread(target=get_thread, args=(4, db_client, query4)) x4.start() x5 = threading.Thread(target=get_thread, args=(5, db_client, query5)) x5.start()
Note that in this blog, we have used a password as a hardcoded string, which is definitely not the way to define passwords. We should use secrets, .env files, etc., as input for passwords. We do not hardcode passwords in the production environment.
Conclusion
After going through the above blog, you might have gotten more familiar with how to perform read and write operations on databases using spark, python, and multithreading concepts. You also know now what are multi processes and what multithreading is. You are now also able to analyze the best way to carry out read-and-write operations on a database based on your requirements.
In general, if you have a small amount of data, you can use a simple python approach to read and write data. If you have a relatively high amount of data, then you can use a multi-threaded approach or a single-partition Spark approach. If you have a huge amount of data, and where reading millions of records per second is a requirement, then you can use the Spark multi-partition approach. In the end, it’s just mostly personal preference, and using which approach depends on your requirements and availability of resources.