Tag: kafka

  • Confluent Kafka vs. Amazon Managed Streaming for Apache Kafka (AWS MSK) vs. on-premise Kafka

    Overview:

    In the evolving landscape of distributed streaming platforms, Kafka has become the foundation of real-time data processing. As organizations opt for multiple deployment options, choosing Confluent Kafka, AWS MSK, and on-premises Kafka becomes important. This blog aims to provide an overview and comparison of these three Kafka deployment methods to help readers decide based on their needs.

    Here’s a list of key bullet points to consider when comparing Confluent Kafka, AWS MSK, and on-premise Kafka:

    1. Deployment and Management
    2. Communication
    3. Scalability
    4. Performance
    5. Cost Considerations
    6. Security

    So, let’s go through all the parameters one by one.

    Deployment and Management: 

    Deployment and management refer to the processes involved in setting up, configuring, and overseeing the operation of a Kafka infrastructure. Each deployment option—Confluent Kafka, AWS MSK, and On-Premise Kafka—has its own approach to these aspects.

    1. Deployment:

    -> Confluent Kafka

    • Confluent Kafka deployment involves setting up the Confluent Platform, which extends the open-source Kafka with additional tools.
    • Users must install and configure Confluent components, such as Confluent Server, Connect, and Control Center.
    • Deployment may vary based on whether it’s on-premise or in the cloud using Confluent Cloud.

    -> AWS MSK

    • AWS MSK streamlines deployment as it is a fully managed service.
    • Users create an MSK cluster through the AWS Management Console or API, specifying configurations like instance types, storage, and networking.
    • AWS handles the underlying infrastructure, automating tasks like scaling, patching, and maintenance.

    -> On-Premise Kafka:

    • On-premise deployment requires manual setup and configuration of Kafka brokers, Zookeeper, and other components.
    • Organizations must provision and maintain the necessary hardware and networking infrastructure.
    • Deployment may involve high availability, fault tolerance, and scalability considerations based on organizational needs.

    2. Management:

    -> Confluent Kafka:

    • Confluent provides additional management tools beyond what’s available in open-source Kafka, including Confluent Control Center for monitoring and Confluent Hub for extensions.
    • Users have control over configurations and can leverage Confluent’s tools for streamlining tasks like data integration and schema management.

    -> AWS MSK:

    • AWS MSK offers a managed environment where AWS takes care of routine management tasks.
    • AWS Console and APIs provide tools for monitoring, scaling, and configuring MSK clusters.
    • AWS handles maintenance tasks such as applying patches and updates to the Kafka software.

    -> On-Premise Kafka:

    • On-premise management involves manual oversight of the entire Kafka infrastructure.
    • Organizations have full control but must handle tasks like software updates, monitoring configurations, and addressing any issues that arise.
    • Management may require coordination between IT and operations teams to ensure smooth operation.

    Communication

    There are three main components of communication protocols, network configurations, and inter-cluster communication, so let’s look at them one by one.

    1. Protocols

    -> Confluent Kafka:

    • Utilizes standard Kafka protocols such as TCP for communication between Kafka brokers and clients.
    • Confluent components may communicate using REST APIs and other protocols specific to Confluent Platform extensions.

    -> AWS MSK:

    • Relies on standard Kafka protocols for communication between clients and brokers.
    • Also employs protocols like TLS/SSL for secure communication.

    -> On-Premise Kafka:

    • Standard Kafka protocols are used for communication between components, including TCP for broker-client communication.
    • Specific protocols may vary based on the organization’s network configuration.

    2. Network Configuration

    -> Confluent Kafka:

    • Requires network configuration for Kafka brokers and other Confluent components.
    • Configuration may include specifying listener ports, security settings, and inter-component communication settings.

    -> AWS MSK:

    • Network configuration involves setting up Virtual Private Cloud (VPC) settings, security groups, and subnet configurations.
    • AWS MSK integrates with AWS networking services, allowing organizations to define network parameters.

    -> On-Premise Kafka:

    • Network configuration includes defining IP addresses, ports, and firewall rules for Kafka brokers.
    • Organizations have full control over the on-premise network infrastructure, allowing for custom configurations.

    3. Inter-Cluster Communication:

    -> Confluent Kafka:

    • Confluent components within the same cluster communicate using Kafka protocols.
    • Communication between Confluent Kafka clusters or with external systems may involve additional configurations.

    -> AWS MSK:

    • AWS MSK clusters can communicate with each other within the same VPC or across different VPCs using standard Kafka protocols.
    • AWS networking services facilitate secure and efficient inter-cluster communication.

    -> On-Premise Kafka:

    • Inter-cluster communication on-premise involves configuring network settings to enable communication between Kafka clusters.
    • Organizations have full control over the network architecture for inter-cluster communication.

    Scaling

    Scalability is crucial for ensuring that Kafka deployments can handle varying workloads and grow as demands increase. Whether through vertical or horizontal scaling, the goal is to maintain performance, reliability, and efficiency in the face of changing requirements. Each deployment option—Confluent Kafka, AWS MSK, and on-premise Kafka—provides mechanisms for achieving scalability, with differences in how scaling is implemented and managed.

    -> Confluent Kafka:

    • Horizontal scaling is a fundamental feature of Kafka, allowing for adding more broker nodes to a Kafka cluster to handle increased message throughput.
    • Kafka’s partitioning mechanism allows data to be distributed across multiple broker nodes, enabling parallel processing and improved scalability.
    • Scaling can be achieved dynamically by adding or removing broker nodes, providing flexibility in adapting to varying workloads.

    -> AWS MSK:

    • AWS MSK leverages horizontal scaling by allowing users to adjust the number of broker nodes in a cluster based on demand.
    • The managed service handles the underlying infrastructure and scaling tasks automatically, simplifying the process for users.
    • Scaling with AWS MSK is designed to be seamless, with the service managing the distribution of partitions across broker nodes.

    -> On-Premise Kafka:

    • Scaling Kafka on-premise involves manually adding or removing broker nodes based on the organization’s infrastructure and capacity planning.
    • Organizations need to consider factors such as hardware limitations, network configurations, and load balancing when scaling on-premise Kafka.

    Performance

    Performance in Kafka, whether it’s Confluent Kafka, AWS MSK (managed streaming for Kafka), or on-premise Kafka, is a critical aspect that directly impacts the throughput, latency, and efficiency of the system. Here’s a brief overview of performance considerations for each:

    -> Confluent Kafka

    • Enhancements and Extensions: Confluent Kafka builds upon the open-source Kafka with additional tools and features, potentially providing optimizations and performance enhancements. This may include tools for monitoring, data integration, and schema management.
    • Customization: Organizations using Confluent Kafka have the flexibility to customize configurations and performance settings based on their specific use cases and requirements.
    • Scalability: Confluent Kafka inherits Kafka’s fundamental scalability features, allowing for horizontal scaling by adding more broker nodes to a cluster.

    -> AWS MSK

    • Managed Environment: AWS MSK is a fully managed service, meaning AWS takes care of many operational aspects, including patches, updates, and scaling. This managed environment can contribute to a more streamlined and optimized performance.
    • Scalability: AWS MSK allows for horizontal scaling by adjusting the number of broker nodes in a cluster dynamically. This elasticity contributes to the efficient handling of varying workloads.
    • Integration with AWS Services: Integration with other AWS services can enhance performance in specific scenarios, such as leveraging high-performance storage solutions or integrating with AWS networking services.

    -> On-Premise Kafka

    • Control and Customization: On-premise Kafka deployments provide organizations with complete control over the infrastructure, allowing for fine-tuning and customization to meet specific performance requirements.
    • Scalability Challenges: Scaling on-premise Kafka may present challenges related to manual provisioning of hardware, potential limitations in scalability due to physical constraints, and increased complexity in managing a distributed system.
    • Hardware Considerations: Performance is closely tied to the quality of hardware chosen for on-premise deployment. Organizations must invest in suitable hardware to meet performance expectations.

    Cost Considerations

    Cost considerations for Kafka deployments involve evaluating the direct and indirect expenses associated with setting up, managing, and maintaining a Kafka infrastructure. Kafka cost considerations span licensing, infrastructure, scalability, operations, data transfer, and ongoing support. The choice between Confluent Kafka, AWS MSK, or on-premise Kafka should be made by evaluating these costs in alignment with the organization’s budget, requirements, and preferences.

    Here’s a brief overview of key cost considerations:

    1. Deployment Costs

    -> Confluent Kafka:

    • Involves licensing costs for Confluent Platform, which provides additional tools and features beyond open-source Kafka.
    • Infrastructure costs for hosting Confluent Kafka, whether on-premise or in the cloud.

    -> AWS MSK:

    • Pay-as-you-go pricing model for AWS MSK, where users pay for the resources consumed by the Kafka cluster.
    • Costs include AWS MSK service charges, as well as associated AWS resource costs such as EC2 instances, storage, and data transfer.

    -> On-Premise Kafka:

    • Upfront costs for hardware, networking equipment, and software licenses.
    • Ongoing operational costs, including maintenance, power, cooling, and any required hardware upgrades.

    2. Scalability Costs

    -> Confluent Kafka:

    • Scaling Confluent Kafka may involve additional licensing costs if more resources are needed.
    • Infrastructure costs increase with the addition of more nodes or resources.

    -> AWS MSK:

    • Scaling AWS MSK is dynamic, and users are billed based on the resources consumed during scaling events.
    • Costs may increase with additional broker nodes and associated AWS resources.

    -> On-Premise Kafka:

    • Scaling on-premise Kafka requires manual provisioning of additional hardware, incurring upfront and ongoing costs.
    • Organizations must consider the total cost of ownership (TCO) when planning for scalability.

    3. Operational Costs

    -> Confluent Kafka:

    • Operational costs include manpower for managing and monitoring Confluent Kafka.
    • Costs associated with maintaining Confluent extensions and tools.

    -> AWS MSK:

    • AWS MSK is a managed service, reducing the operational burden on the user.
    • Operational costs may include AWS support charges and personnel for configuring and monitoring the Kafka environment.

    -> On-Premise Kafka:

    • Organizations bear full operational responsibility, incurring staffing, maintenance, monitoring tools, and ongoing support costs.

    4. Data Transfer Costs:

    -> Confluent Kafka:

    • Costs associated with data transfer depend on the chosen deployment model (e.g., cloud-based deployments may have data transfer costs).

    -> AWS MSK:

    • Data transfer costs may apply for communication between AWS MSK clusters and other AWS services.

    -> On-Premise Kafka:

    • Data transfer costs may be associated with network usage, especially if there are multiple geographically distributed Kafka clusters.

    Check the below images of pricing figures for Confluent Kafka and AWS MSK side by side.

    Security

    Security involves implementing measures to protect data, ensure confidentiality, integrity, and availability, and mitigate risks associated with unauthorized access or data breaches. Here’s a brief overview of key security considerations for Kafka deployments:

    1. Authentication:

    -> Confluent Kafka:

    • Supports various authentication mechanisms, including SSL/TLS for encrypted communication and SASL (Simple Authentication and Security Layer) for user authentication.
    • Role-based access control (RBAC) allows administrators to define and manage user roles and permissions.

    -> AWS MSK:

    • Integrates with AWS Identity and Access Management (IAM) for user authentication and authorization.
    • IAM policies control access to AWS MSK resources and actions.

    -> On-Premise Kafka:

    • Authentication mechanisms depend on the chosen Kafka distribution, but SSL/TLS and SASL are common.
    • LDAP or other external authentication systems may be integrated for user management.

    2. Authorization:

    -> Confluent Kafka:

    • RBAC allows fine-grained control over what users and clients can do within the Kafka environment.
    • Access Control Lists (ACLs) can be used to restrict access at the topic level.

    -> AWS MSK:

    • IAM policies define permissions for accessing and managing AWS MSK resources.
    • Fine-grained access control can be enforced using IAM roles and policies.

    -> On-Premise Kafka:

    • ACLs are typically used to specify which users or applications have access to specific Kafka topics or resources.
    • Access controls depend on the Kafka distribution and configuration.

    3. Encryption:

    -> Confluent Kafka:

    • Supports encryption in transit using SSL/TLS for secure communication between clients and brokers.
    • Optionally supports encryption at rest for data stored on disks.

    -> AWS MSK:

    • Encrypts data in transit using SSL/TLS for communication between clients and brokers.
    • Provides options for encrypting data at rest using AWS Key Management Service (KMS).

    -> On-Premise Kafka:

    • Encryption configurations depend on the Kafka distribution and may involve configuring SSL/TLS for secure communication and encryption at rest using platform-specific tools.

    Here are some key points to evaluate AWS MSK, Confluent Kafka, and on-premise Kafka, along with their advantages and disadvantages.

    Conclusion

    • The best approach is to evaluate the level of Kafka expertise in-house and the consistency of your workload.
    • On one side of the spectrum, with less expertise and uncertain workloads, Confluent’s breadth of features and support comes out ahead.
    • On the other side, with experienced Kafka engineers and a very predictable workload, you can save money with MSK.
    • For specific requirements and full control, you can go with on-premise Kafka.
  • Best Practices for Kafka Security

    Overview‍

    We will cover the security concepts of Kafka and walkthrough the implementation of encryption, authentication, and authorization for the Kafka cluster.

    This article will explain how to configure SASL_SSL (simple authentication security layer) security for your Kafka cluster and how to protect the data in transit. SASL_SSL is a communication type in which clients use authentication mechanisms like PLAIN, SCRAM, etc., and the server uses SSL certificates to establish secure communication. We will use the SCRAM authentication mechanism here for the client to help establish mutual authentication between the client and server. We’ll also discuss authorization and ACLs, which are important for securing your cluster.

    Prerequisites

    Running Kafka Cluster, basic understanding of security components.

    Need for Kafka Security

    The primary reason is to prevent unlawful internet activities for the purpose of misuse, modification, disruption, and disclosure. So, to understand the security in Kafka cluster a secure Kafka cluster, we need to know three terms:

    • Authentication – It is a security method used for servers to determine whether users have permission to access their information or website.
    • Authorization – The authorization security method implemented with authentication enables servers to have a methodology of identifying clients for access. Basically, it gives limited access, which is sufficient for the client.
    • Encryption – It is the process of transforming data to make it distorted and unreadable without a decryption key. Encryption ensures that no other client can intercept and steal or read data.

    Here is the quick start guide by Apache Kafka, so check it out if you still need to set up Kafka.

    https://kafka.apache.org/quickstart

    We’ll not cover the theoretical aspects here, but you can find a ton of sources on how these three components work internally. For now, we’ll focus on the implementation part and how Kafka revolves around security.

    This image illustrates SSL communication between the Kafka client and server.

    We are going to implement the steps in the below order:

    • Create a Certificate Authority
    • Create a Truststore & Keystore

    Certificate Authority – It is a trusted entity that issues SSL certificates. As such, a CA is an independent entity that acts as a trusted third party, issuing certificates for use by others. A certificate authority validates the credentials of a person or organization that requests a certificate before issuing one.

    Truststore – A truststore contains certificates from other parties with which you want to communicate or certificate authorities that you trust to identify other parties. In simple words, a list of CAs that can validate the certificate signed by the trusted CA.

    KeyStore – A KeyStore contains private keys and certificates with their corresponding public keys. Keystores can have one or more CA certificates depending upon what’s needed.

    For Kafka Server, we need a server certificate, and here, Keystore comes into the picture since it stores a server certificate. The server certificate should be signed by Certificate Authority (CA). The KeyStore requests to sign the server certificate and in response, CA send a signed CRT to Keystore.

    We will create our own certificate authority for demonstration purposes. If you don’t want to create a private certificate authority, there are many certificate providers you can go with, like IdenTrust and GoDaddy. Since we are creating one, we need to tell our Kafka client to trust our private certificate authority using the Trust Store.

    This block diagram shows you how all the components communicate with each other and their role to generate the final certificate.

    So, let’s create our Certificate Authority. Run the below command in your terminal:

    “openssl req -new -keyout <private_key_name> -out <public_certificate_name>”

    It will ask for a passphrase, and keep it safe for future use cases. After successfully executing the command, we should have two files named private_key_name and public_certificate_name.

    Now, let’s create a KeyStore and trust store for brokers; we need both because brokers also interact internally with each other. Let’s understand with the help of an example: Broker A wants to connect with Broker B, so Broker A acts as a client and Broker B as a server. We are using the SASL_SSL protocol, so A needs SASL credentials, and B needs a certificate for authentication. The reverse is also possible where Broker B wants to connect with Broker A, so we need both a KeyStore and a trust store for authentication.

    Now let’s create a trust store. Execute the below command in the terminal, and it should ask for the password. Save the password for future use:

    “keytool -keystore <truststore_name.jks> -alias <alias name of the entry to process> -import -file <public_certificate_name>”

    Here, we are using the .jks extension for the file, which stands for Java KeyStore. You can also use Public-Key Cryptography Standards #12 (pkcs12) instead of .jks, but that’s totally up to you. public_certificate_name is the same certificate while we create CA.

    For the KeyStore configuration, run the below command and store the password:

    “keytool genkey -keystore <keystore_name.jks> -validity <number_of_days> -storepass <store_password> -genkey -alias <alias_name> -keyalg <key algorithm name> -ext SAN=<“DNS:localhost”>”

    This action creates the KeyStore file in the current working directory. The question “First and Last Name” requires you to enter a fully qualified domain name because some certificate authorities, such as VeriSign, expect this property to be a fully qualified domain name. Not all CAs require a fully qualified domain name, but I recommend using a fully qualified domain name for portability. All other information should be valid. If the information cannot be verified, a certificate authority such as VeriSign will not sign the CSR generated for that record. I’m using localhost for the domain name here, as seen in the above command itself.

    Keystore has an entry with alias_name. It contains the private key and information needed for generating a CSR. Now let’s create a signing certificate request, so it will be used to get a signed certificate from Certificate Authority.

    Execute the below command in your terminal:

    “keytool -keystore <keystore_name.jks> -alias <alias_name> -certreq -file <file_name.csr>”

    So, we have generated a signing certificate request using a KeyStore (the KeyStore name and alias name should be the same). It should ask for the KeyStore password, so enter the same one used while creating the KeyStore.

    Now, execute the below command. It will ask for the password, so enter the CA password, and now we have a signed certificate:

    “openssl x509 -req -CA <public_certificate_name> -CAkey <private_key_name> -in <csr file> -out <signed_file_name> -CAcreateserial”

    Finally, we need to add the public certificate of CA and signed certificate in the KeyStore, so run the below command. It will add the CA certificate to the KeyStore.

    “keytool -keystore <keystore_name.jks> -alias <public_certificate_name> -import -file <public_certificate_name>”

    Now, let’s run the below command; it will add the signed certificate to the KeyStore.

    “keytool -keystore <keystore_name.jks> -alias <alias_name> -import -file <signed_file_name>”

    As of now, we have generated all the security files for the broker. For internal broker communication, we are using SASL_SSL (see security.inter.broker.protocol in server.properties). Now we need to create a broker username and password using the SCRAM method. For more details, click here.

    Run the below command:

    “kafka-configs.sh –zookeeper <host: port> –entity-type users –entity-name <username> –alter –add-config ‘SCRAM-SHA-512=[password=<password>]’”

    NOTE: Credentials for inter-broker communication must be created before Kafka brokers are started.

    Now, we need to configure the Kafka broker property file, so update the file as given below:

    listeners=SASL_SSL://localhost:9092
    advertised.listeners=SASL_SSL://localhost:9092
    ssl.truststore.location={path/to/truststore_name.jks}
    ssl.truststore.password={truststore_password}
    ssl.keystore.location={/path/to/keystore_name.jks}
    ssl.keystore.password={keystore_password}
    security.inter.broker.protocol=SASL_SSL
    ssl.client.auth=none
    ssl.protocol=TLSv1.2
    sasl.enabled.mechanisms=SCRAM-SHA-512
    sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
    listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username={username} password={password};
    super.users=User:{username}

    NOTE: If you are using an external jaas config file, then remove the ScramLoginModule line and set this environment variable before starting broker. “export KAFKA_OPTS=-Djava.security.auth.login.config={path/to/broker.conf}”

    Now, if we run Kafka, the broker should be running on port 9092 without any failure, and if you have multiple brokers inside Kafka, the same config file can be replicated among them, but the port should be different for each broker.

    Producers and consumers need a username and a password to access the broker, so let’s create their credentials and update respective configurations.

    Create a producer user and update producer.properties inside the bin directory, so execute the below command in your terminal.

    “bin/kafka-configs.sh –zookeeper <host: port> –entity-type users –entity-name <producer_name> –alter –add-config ‘SCRAM-SHA-512=[password=<password>]’”

    We need a trust store file for our clients (producer and consumer), but as we already know how to create a trust store, this is a small task for you. It is suggested that producers and consumers should have separate trust stores because when we move Kafka to production, there could be multiple producers and consumers on different machines.

    security.protocol=SASL_SSL
    ssl.protocol=TLSv1.2
    ssl.truststore.location={path/to/client.truststore.jks}
    ssl.truststore.password={password}
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username={producer_name} password={password};

    The below command creates a consumer user, so now let’s update consumer.properties inside the bin directory:

    “bin/kafka-configs.sh –zookeeper <host: port> –entity-type users –entity-name <consumer_name> –alter –add-config ‘SCRAM-SHA-512=[password=<password>]’”

    security.protocol=SASL_SSL
    ssl.protocol=TLSv1.2
    ssl.truststore.location={path/to/client.truststore.jks}
    ssl.truststore.password={password}
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username={consumer_name} password={password};

    As of now, we have implemented encryption and authentication for Kafka brokers. To verify that our producer and consumer are working properly with SCRAM credentials, run the console producer and consumer on some topics.

    Authorization is not implemented yet. Kafka uses access control lists (ACLs) to specify which users can perform which actions on specific resources or groups of resources. Each ACL has a principal, a permission type, an operation, a resource type, and a name.

    The default authorizer is ACLAuthorizer provided by Kafka; Confluent also provides the Confluent Server Authorizer, which is totally different from ACLAuthorizer. An authorizer is a server plugin used by Kafka to authorize actions. Specifically, the authorizer controls whether operations should be authorized based on the principal and resource being accessed.

    Format of ACLs – Principal P is [Allowed/Denied] Operation O from Host H on any Resource R matching ResourcePattern RP

    Execute the below command to create an ACL with writing permission for the producer:

    “bin/kafka-acls.sh –authorizer-properties zookeeper.connect=<host: port> –add –allow-principal User:<producer_name> –operation WRITE –topic <topic_name>”

    The above command should create ACL of write operation for producer_name on topic_name.

    Now, execute the below command to create an ACL with reading permission for the consumer:

    “bin/kafka-acls.sh –authorizer-properties zookeeper.connect=<host: port> –add –allow-principal User:<consumer_name> –operation READ –topic <topic_name>”

    Now we need to define the consumer group ID for this consumer, so the below command associates a consumer with a given consumer group ID.

    “bin/kafka-acls.sh –authorizer-properties zookeeper.connect=<host: port> –add –allow-principal User:<consumer_name> –operation READ –group <consumer_group_name>”

    Now, we need to add some configuration in two files: broker.properties and consumer.properties.

    # Authorizer class
    authorizer.class.name=kafka.security.authorizer.AclAuthorizer

    The above line indicates that AclAuthorizer class is used for authorization.

    # consumer group id
    group.id=<consumer_group_name>

    Consumer group-id is mandatory, and if we do not specify any group, a consumer will not be able to access the data from topics, so to start a consumer, group-id should be provided.

    Let’s test the producer and consumer one by one, run the console producer and also run the console consumer in another terminal; both should be running without error.

    console-producer
    console-consumer

    Voila!! Your Kafka is secured.

    Summary

    In a nutshell, we have implemented security in our Kafka using the SASL_SSL mechanism and learned how to create ACLs and give different permission to different users.

    Apache Kafka is the wild west without security. By default, there is no encryption, authentication, or access control list. Any client can communicate with the Kafka broker using the PLAINTEXT port. Access using this port should be restricted to trusted clients only. You can use network segmentation and/or authentication ACLs to restrict access to trusted IP addresses in these cases. If none of these are used, the cluster is wide open and available to anyone. A basic knowledge of Kafka authentication, authorization, encryption, and audit trails is required to safely move a system into production.

  • Real Time Text Classification Using Kafka and Scikit-learn

    Introduction:

    Text classification is one of the essential tasks in supervised machine learning (ML). Assigning categories to text, which can be tweets, Facebook posts, web page, library book, media articles, gallery, etc. has many applications like spam filtering, sentiment analysis, etc. In this blog, we build a text classification engine to classify topics in an incoming Twitter stream using Apache Kafka and scikit-learn – a Python based Machine Learning Library.

    Let’s dive into the details. Here is a diagram to explain visually the components and data flow. The Kafka producer will ingest data from Twitter and send it to Kafka broker. The Kafka consumer will ask the Kafka broker for the tweets. We convert the tweets binary stream from Kafka to human readable strings and perform predictions using saved models. We train the models using Twenty Newsgroups which is a prebuilt training data from Sci-kit. It is a standard data set used for training classification algorithms. 

    In this blog we will use the following machine learning models:

    We have used the following libraries/tools:

    • tweepy – Twitter library for python
    • Apache Kafka
    • scikit-learn
    • pickle – Python Object serialization library

    Let’s first understand the following key concepts:

    • Word to Vector Methodology (Word2Vec)
    • Bag-of-Words
    • tf-idf
    • Multinomial Naive Bayes classifier

    Word2Vec methodology

    One of the key ideas in Natural Language Processing(NLP) is how we can efficiently convert words into numeric vectors which can then be given as an input to machine learning models to perform predictions.

    Neural networks or any other machine learning models are nothing but mathematical functions which need numbers or vectors to churn out the output except tree based methods, they can work on words.

    For this we have an approach known as Word2Vec. A very trivial solution to this would be to use “one-hot” method of converting the word into a sparse matrix with only one element of the vector set to 1, the rest being zero.

    For example, “the apple a day the good” would have following representation

    Here we have transformed the above sentence into a 6×5 matrix, with the 5 being the size of the vocabulary as “the” is repeated. But what are we supposed to do when we have a gigantic dictionary to learn from say more than 100000 words? Here one hot encoding fails. In one hot encoding the relationship between the words is lost. Like “Lanka” should come after “Sri”.

    Here is where Word2Vec comes in. Our goal is to vectorize the words while maintaining the context. Word2vec can utilize either of two model architectures to produce a distributed representation of words: continuous bag-of-words (CBOW) or continuous skip-gram. In the continuous bag-of-words architecture, the model predicts the current word from a window of surrounding context words. The order of context words does not influence prediction (bag-of-words assumption). In the continuous skip-gram architecture, the model uses the current word to predict the surrounding window of context words. 

    Tf-idf (term frequency–inverse document frequency)

    TF-IDF is a statistic which determines how important is a word to the document in given corpus. Variations of tf-idf is used by search engines, for text summarizations etc. You can read more about tf-idf – here.

    Multinomial Naive Bayes classifier

    Naive Bayes Classifier comes from family of probabilistic classifiers based on Bayes theorem. We use it to classify spam or not spam, sports or politics etc. We are going to use this for classifying streams of tweets coming in. You can explore it – here.

    Lets how they fit in together.

    The data from the “20 newsgroups datasets” is completely in text format. We cannot feed it directly to any model to do mathematical calculations. We have to extract features from the datasets and have to convert them to numbers which a model can ingest and then produce an output.
    So, we use Continuous Bag of Words and tf-idf for extracting features from datasets and then ingest them to multinomial naive bayes classifier to get predictions.

    1. Train Your Model

    We are going to use this dataset. We create another file and import the needed libraries We are using sklearn for ML and pickle to save trained model. Now we define the model.

    from __future__ import division,print_function, absolute_import
    from sklearn.datasets import fetch_20newsgroups #built-in dataset
    from sklearn.feature_extraction.text import CountVectorizer
    from sklearn.feature_extraction.text import TfidfTransformer
    from sklearn.naive_bayes import MultinomialNB
    import pickle
    from kafka import KafkaConsumer
    
    #Defining model and training it
    categories = ["talk.politics.misc","misc.forsale","rec.motorcycles",
    "comp.sys.mac.hardware","sci.med","talk.religion.misc"] #http://qwone.com/~jason/20Newsgroups/ for reference
    
    def fetch_train_dataset(categories):
    twenty_train = fetch_20newsgroups(subset='train', categories=categories, shuffle=True, random_state=42)
    return twenty_train
    
    def bag_of_words(categories):
    count_vect = CountVectorizer()
    X_train_counts = count_vect.fit_transform(fetch_train_dataset(categories).data)
    pickle.dump(count_vect.vocabulary_, open("vocab.pickle", 'wb'))
    return X_train_counts
    
    def tf_idf(categories):
    tf_transformer = TfidfTransformer()
    return (tf_transformer,tf_transformer.fit_transform(bag_of_words(categories)))
    
    def model(categories):
    clf = MultinomialNB().fit(tf_idf(categories)[1], fetch_train_dataset(categories).target)
    return clf
    
    model = model(categories)
    pickle.dump(model,open("model.pickle", 'wb'))
    print("Training Finished!")
    #Training Finished Here

    2. The Kafka Tweet Producer

    We have the trained model in place. Now lets get the real time stream of Twitter via Kafka. We define the Producer.

    # import required libraries
    from kafka import SimpleProducer, KafkaClient
    from tweepy.streaming import StreamListener
    from tweepy import OAuthHandler
    from tweepy import Stream
    from twitter_config import consumer_key, consumer_secret, access_token, access_token_secret
    import json

    Now we will define Kafka settings and will create KafkaPusher Class. This is necessary because we need to send the data coming from tweepy stream to Kafka producer.

    # Kafka settings
    topic = b'twitter-stream'
    
    # setting up Kafka producer
    kafka = KafkaClient('localhost:9092')
    producer = SimpleProducer(kafka)
    
    class KafkaPusher(StreamListener):
    
    def on_data(self, data):
    all_data = json.loads(data)
    tweet = all_data["text"]
    producer.send_messages(topic, tweet.encode('utf-8'))
    return True
    
    def on_error(self, status):
    print statusWORDS_TO_TRACK = ["Politics","Apple","Google","Microsoft","Bikes","Harley Davidson","Medicine"]
    
    if __name__ == '__main__':
    l = KafkaPusher()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    stream = Stream(auth, l)
    while True:
    try:
    stream.filter(languages=["en"], track=WORDS_TO_TRACK)
    except:
    pass

    Note – You need to start Kafka server before running this script.

    3. Loading your model for predictions

    Now we have the trained model in step 1 and a twitter stream in step 2. Lets use the model now to do actual predictions. The first step is to load the model:

    #Loading model and vocab
    print("Loading pre-trained model")
    vocabulary_to_load = pickle.load(open("vocab.pickle", 'rb'))
    count_vect = CountVectorizer(vocabulary=vocabulary_to_load)
    load_model = pickle.load(open("model.pickle", 'rb'))count_vect._validate_vocabulary()
    tfidf_transformer = tf_idf(categories)[0]

    Then we start the kafka consumer and begin predictions:

    #predicting the streaming kafka messages
    consumer = KafkaConsumer('twitter-stream',bootstrap_servers=['localhost:9092'])
    print("Starting ML predictions.")
    for message in consumer:
    X_new_counts = count_vect.transform([message.value])
    X_new_tfidf = tfidf_transformer.transform(X_new_counts)
    predicted = load_model.predict(X_new_tfidf)
    print(message.value+" => "+fetch_train_dataset(categories).target_names[predicted[0]])

    Following are some of the classification done by our model

    • RT @amazingatheist: Making fun of kids who survived a school shooting just days after the event because you disagree with their politics is… => talk.politics.misc
    • sci.med
    • RT @DavidKlion: Apropos of that D’Souza tweet; I think in order to make sense of our politics, you need to understand that there are some t… => talk.politics.misc
    • RT @BeauWillimon: These students have already cemented a place in history with their activism, and they’re just getting started. No one wil… => talk.politics.misc
    • RT @byedavo: Cause we ain’t got no president => talk.politics.misc
    • RT @appleinsider: .@Apple reportedly in talks to buy cobalt, key Li-ion battery ingredient, directly from miners … => comp.sys.mac.hardware

    Here is the link to the complete git repository

    Conclusion:

    In this blog, we were successful in creating a data pipeline where we were using the Naive Bayes model for doing classification of the streaming twitter data. We can classify other sources of data like news articles, blog posts etc. Do let us know if you have any questions, queries and additional thoughts in the comments section below.

    Happy coding!