Tag: apache kafka

  • A Comprehensive Guide to Unlock Kafka MirrorMaker 2.0

    Overview

    We are covering how Kafka MirrorMaker operates, how to set it up, and how to test mirror data.    

    MirrorMaker 2.0 is the new replication feature of Kafka 2.4, defined as part of the Kafka Improvement Process – KIP 382. Kafka MirrorMaker 2 is designed to replicate or mirror topics from one Kafka cluster to another. It uses the Kafka Connect framework to simplify the configuration and scaling. MirrorMaker dynamically detects changes to source topics and ensures source and target topic properties are synchronized, including topic data, offsets, and partitions. The topic, together with topic data, offsets, and partitions, is replicated in the target cluster when a new topic is created in the source cluster.

    Use Cases

    Disaster Recovery

    Though Kafka is highly distributed and provides a high level of fault tolerance, disasters can still happen, and data can still become temporarily unavailable—or lost altogether. The best way to mitigate the risks is to have a copy of your data in another Kafka cluster in a different data center. MirrorMaker translates and syncs consumer offsets to the target cluster. That way, we can switch clients to it relatively seamlessly, moving to an alternative deployment on the fly with minor or no service interruptions.

    Closer Read / Writes

    Kafka producer clients often prefer to write locally to achieve low latency, but business requirements demand the data be read by different consumers, often deployed in multiple regions. This can easily make deployments complex due to VPC peering. MirrorMaker can handle all complex replication, making it easier to write and read local mechanisms.

    Data Analytics

    Aggregation is also a factor in data pipelines, which might require the consolidation of data from regional Kafka clusters into a single one. That aggregate cluster then broadcasts that data to other clusters and/or data systems for analysis and visualization.

    Supported Topologies

    • Active/Passive or Active/Standby high availability deployments – (ClusterA => ClusterB)
    • Active/Active HA Deployment – (ClusterA => ClusterB and ClusterB => ClusterA)
    • Aggregation (e.g., from many clusters to one): (ClusterA => ClusterK, ClusterB => ClusterK, ClusterC => ClusterK)
    • Fan-out (opposite of Aggregation): (ClusterK => ClusterA, ClusterK => ClusterB, ClusterK => ClusterC)
    • Forwarding: (ClusterA => ClusterB, ClusterB => ClusterC, ClusterC => ClusterD)

    Salient Features of MirrorMaker 2

    • Mirrors Topic and Topic Configuration – Detects and mirrors new topics and config changes automatically, including the number of partitions and replication factors.
    • Mirrors ACLs – Mirrors Topic ACLs as well, though we found issues in replicating WRITE permission. Also, replicated topics often contain source cluster names as a prefix, which means existing ACLs need to be tweaked, or ACL replication may need to be managed externally if the topologies are more complex.
    • Mirrors Consumer Groups and Offsets – Seamlessly translates and syncs Consumer Group Offsets to target clusters to make it easier to switch from one cluster to another in case of disaster.
    • Ability to Update MM2 Config Dynamically – MirrorMaker is backed by Kafka Connect Framework, which provides REST APIs through which MirrorMaker configurations like replicating new topics, stopping replicating certain topics, etc. can be updated without restarting the cluster.
    • Fault-Tolerant and Horizontally Scalable Operations – The number of processes can be scaled horizontally to increase performance.

    How Kafka MirrorMaker 2 Works

    MirrorMaker uses a set of standard Kafka connectors. Each connector has its own role. The listing of connectors and their functions is provided below.

    • MirrorSourceConnector: Replicates topics, topic ACLs, and configs from the source cluster to the target cluster.
    • MirrorCheckpointConnector: Syncs consumer offsets, emits checkpoints, and enables failover.
    • MirrorHeartBeatConnector: Checks connectivity between the source and target clusters.

    MirrorMaker Running Modes

    There are three ways to run MirrorMaker:

    • As a dedicated MirrorMaker cluster (can be distributed with multiple replicas having the same config): In this mode, MirrorMaker does not require an existing Connect cluster. Instead, a high-level driver manages a collection of Connect workers.
    • As a standalone Connect worker: In this mode, a single Connect worker runs MirrorSourceConnector. This does not support multi-clusters, but it’s useful for small workloads or for testing.
    • In legacy mode, using existing MirrorMaker scripts: After legacy MirrorMaker is deprecated, the existing ./bin/kafka-mirror-maker.sh scripts will be updated to run MM2 in legacy mode:

    Setting up MirrorMaker 2

    We recommend running MirrorMaker as a dedicated MirrorMaker cluster since it does not require an existing Connect cluster. Instead, a high-level driver manages a collection of Connect workers. The cluster can be easily converted to a distributed cluster just by adding multiple replicas of the same configuration. A distributed cluster is required to reduce the load on a single node cluster and also to increase MirrorMaker throughput.

    Prerequisites

    • Docker
    • Docker Compose

    Steps to Set Up MirrorMaker 2

    Set up a single node source, target Kafka cluster, and a MirrorMaker node to run MirrorMaker 2.

    1. Clone repository:

    https://gitlab.com/velotio/kafka-mirror-maker.git

    2. Run the below command to start the Kafka clusters and the MirrorMaker Docker container:

    docker-compose up -d

    3. Login to the mirror-maker docker container: 

    docker exec -it $(docker ps | grep u0022mirror-maker-node-1u0022 | awk '{print $1}') bash

    4. Start MirrorMaker:

    connect-mirror-maker.sh ./mirror-maker-config.properties

    5. Monitor the logs of the MirrorMaker container—it should be something like this: 

    • [2024-02-05 04:07:39,450] INFO [MirrorCheckpointConnector|task-0] sync idle consumer group offset from source to target took 0 ms (org.apache.kafka.connect.mirror.Scheduler:95)
    • [2024-02-05 04:07:49,246] INFO [MirrorCheckpointConnector|worker] refreshing consumer groups took 1 ms (org.apache.kafka.connect.mirror.Scheduler:95)
    • [2024-02-05 04:07:49,337] INFO [MirrorSourceConnector|worker] refreshing topics took 3 ms (org.apache.kafka.connect.mirror.Scheduler:95)
    • [2024-02-05 04:07:49,450] INFO [MirrorCheckpointConnector|task-0] refreshing idle consumers group offsets at target cluster took 2 ms (org.apache.kafka.connect.mirror.Scheduler:95)

    6. Create a topic at the source cluster: 

    kafka-topics.sh --create --bootstrap-server source-kafka:9092 --topic test-topic --partitions 1 --replication-factor 1

    7. List topics and validate the topic: 

    kafka-topics.sh u002du002dlist u002du002dbootstrap-server source-kafka:9092

    8. Produce 100 messages on the topic:

    for x in {1..100}; do echo u0022message $xu0022; done | kafka-console-producer.sh u002du002dbroker-list source-kafka:9092 u002du002dtopic test-topic

    9. Check whether the topic is mirrored in the target cluster.

    Note: The mirrored topic will have a source cluster name prefix to be able to identify which source cluster the topic is mirrored from.

    kafka-topics.sh u002du002dlist u002du002dbootstrap-server target-kafka:9092

    10. Consume 5 messages from the source kafka cluster:

    kafka-console-consumer.sh --bootstrap-server source-kafka:9092  --topic test-topic --max-messages 5 --consumer-property enable.auto.commit=true --consumer-property group.id=test-group —from-beginning

    11. Describe the consumer group at the source and destination to verify that consumer offsets are also mirrored:

    kafka-consumer-groups.sh --bootstrap-server source-kafka:9092 --group test-group --describe

    kafka-consumer-groups.sh --bootstrap-server target-kafka:9092 --group test-group --describe

    12. Consume five messages from the target Kafka cluster. The messages should start from the committed offset in the source cluster. In this case, the message offset will start at 6.

    kafka-console-consumer.sh --bootstrap-server target-kafka:9092  --topic source-kafka.test-topic --max-messages 5 --consumer-property enable.auto.commit=true --consumer-property group.id=test-group —from-beginning

    Conclusion

    We’ve seen how to set up MirrorMaker 2.0 in a dedicated instance. This running mode does not need a running Connect cluster as it leverages a high-level driver that creates a set of Connect workers based on the MirrorMaker properties configuration file.

  • Setting up Mutual TLS Authentication and Authorization on Amazon MSK

    Overview

    We will cover how to set up mutual TLS authentication and authorization on Amazon MSK.

    Amazon MSK is a fully managed service that makes it easy to build and run applications that use Apache Kafka to process streaming data. You can enable client authentication with TLS for connections and client authorization from your applications to your Amazon MSK brokers and ZooKeeper nodes. 

    Prerequisites

    • Terraform: For creating a private CA and MSK Cluster
    • AWS CLI: For creating TLS certificates (the user must have access to create a private CA, issue certificates, and create MSK cluster)

    Setup TLS authentication and authorization

    To use client authentication with TLS on MSK, you need to create the following resources:

    • AWS Private CA
    • MSK cluster with TLS encryption enabled
    • Client certificates

    Create AWS Private CA

    AWS Private CA can be either in the same AWS account as your cluster, or in a different account. For information about AWS Private CAs, see Creating and Managing a AWS Private CA. In this setup, we will use Terraform to create a private CA.

    Steps to create Private CA

    1. Run below Terraform code to create the Private CA.
    terraform {
    required_providers {
    aws = {
          source  = "hashicorp/aws"
          version = "~> 4.0"
        }
      }
    }
    resource "aws_acmpca_certificate_authority" "root_ca" {
    certificate_authority_configuration {
    key_algorithm     = "RSA_4096"
    signing_algorithm = "SHA512WITHRSA"
    subject {
    #Update the attributes as per your need
    common_name         = "exp-msk-ca"
    country             = "US"
    locality            = "Seattle"
    organization        = "Example Corp"
    organizational_unit = "Sales"
    state               = "WA"
        }
      }
    type = "ROOT"
    }

    1. Once the private CA is created, install the certificate from the AWS console.

    Steps to install the certificate.

    • If you are not already on the CA’s details page, open the AWS Private CA console at https://console.aws.amazon.com/acm-pca/home. On the private certificate authorities page, choose a root CA that you have created with the certificate status as Pending or Active.
    • Choose Actions, and installthe  CA certificate to open the Install root CA certificate page.
    • Under Specify the root CA certificate parameters, specify the following certificate parameters:
    • Validity — Specifies the expiration date and time for the CA certificate. The AWS Private CA default validity period for a root CA certificate is ten years.
    • Signature algorithm — Specifies the signing algorithm to use when the root CA issues new certificates. Available options vary according to the AWS Region where you are creating the CA. For more information, see Compatible signing algorithms, Supported cryptographic algorithms, and SigningAlgorithm in CertificateAuthorityConfiguration.
    • SHA256 RSA
    • Review your settings to make sure they’re correct, then choose Confirm and install.        
    • The details page for the CA displays the status of the installation (success or failure) at the top. If the installation was successful, the newly completed root CA displays a status of Active in the General pane.

    Create an MSK cluster that supports TLS client authentication.

    Note: We highly recommend using independent AWS Private CA for each MSK cluster when you use mutual TLS to control access. Doing so will ensure that TLS certificates signed by PCAs only authenticate with a single MSK cluster.

    Run the below Terraform code to create MSK cluster

    Note: Update attributes as per the requirement and configurations.

    terraform {
      required_providers {
        aws = {
          source  = "hashicorp/aws"
          version = "~> 4.0"
        }
      }
    }
    module "kafka" {
      source = "cloudposse/msk-apache-kafka-cluster/aws"
      # Cloud Posse recommends pinning every module to a specific version
      version                       = "2.3.0"
      name                          = "test-msk-cluster" #Change MSK cluster name as per your need
      vpc_id                        = "<VPC_ID>" 
      subnet_ids                    = ["SUBNET1a","SUNBNET2b"] # Minimum 2 subnets required.
      kafka_version                 = "3.4.0" #recommended version by AWS as of 19 Sep 2022
      broker_per_zone               = 1 #Number of broker per availability zone
      broker_instance_type          = "kafka.t3.small" #MSK instance types
      broker_volume_size            = 10 #Broker disk size
      certificate_authority_arns    = ["<CA_ARN>"]  #arn of the CA that you have created in the earlier step
      client_tls_auth_enabled       = true
      encryption_in_cluster         = true 
      client_broker                 = "TLS" # Enables TLS encryption
      enhanced_monitoring           = "PER_TOPIC_PER_BROKER"
      cloudwatch_logs_enabled       = false # Enable if you need cloudwatch logs
      jmx_exporter_enabled          = false # Enable if you need jmx metrics
      node_exporter_enabled         = false # Enable if you need node metrics
      associated_security_group_ids = ["${aws_security_group.kafka_sg.id}"]
      allowed_security_group_ids    = ["${aws_security_group.kafka_sg.id}"]
      create_security_group         = false
    }
    #-----------------------End--------------------#
    resource "aws_security_group" "kafka_sg" {
      name        = "test-msk-cluster-sg" #Change the name as per your need 
      description = "Security Group for kafka cluster"
      vpc_id      = "<VPC_ID>"
      egress {
        from_port        = 0
        to_port          = 0
        protocol         = "-1"
        cidr_blocks      = ["0.0.0.0/0"]
        ipv6_cidr_blocks = ["::/0"]
      }
      ingress {
        from_port        = 2181
        to_port          = 2181
        protocol         = "tcp"
        cidr_blocks      = ["0.0.0.0/0"]
        ipv6_cidr_blocks = ["::/0"]
      }
      ingress {
        from_port        = 9094
        to_port          = 9094
        protocol         = "tcp"
        cidr_blocks      = ["0.0.0.0/0"]
        ipv6_cidr_blocks = ["::/0"]
      }
      # Enable if you need to add tags to MSK cluster
      #tags = var.tags
      # Enable if you need cloudwatch logs
      # depends_on = [
      #   aws_cloudwatch_log_group.cw_log_group
      #]
    }
    # Required for cloudwatch logs
    # resource "aws_cloudwatch_log_group" "cw_log_group" {
    #   name = "blog-msk-cluster"
    #   #tags = var.tags
    # }
    
    output "bootstrap_url" {
      value       = module.kafka.bootstrap_brokers_tls
      description = "Comma separated list of one or more DNS names (or IP addresses) and TLS port pairs for access to the Kafka cluster using TLS"
    }

    It will take 15-20 minutes to create the MSK cluster. 

    Note: Since the bootstrap URL will be used to communicate with the MSK cluster using the Kafka CLI or SDKs, save it from the Terraform output.

    Create TLS certificates using previously created AWS Private CA

    We will create two certificates, one is for admin access, and the other one is for client access. For creating certificates, a common name (CN) is required. The CN is used as a principal while granting permissions through kafka ACLs

    Create admin TLS certificate

    Steps to create TLS certificate

    1. Generate CSR and key.
    openssl req -newkey rsa:2048 -keyout key.pem -out cert.csr -batch -nodes -subj '/CN=admin'

    1. Issue certificate using previously created private CA (replace <CA_ARN> with the ARN of the AWS Private CA that you created).
    certArn=$(aws acm-pca issue-certificate --region <region> --certificate-authority-arn "<CA_ARN>" 
    --csr fileb://cert.csr 
    --signing-algorithm 'SHA256WITHRSA' --validity Value=180,Type='DAYS' --query 
    'CertificateArn' --output text)

    1. Get the certificate ARN issued in the previous step.
    aws acm-pca get-certificate --region <region> --certificate-authority-arn 
    "<CA_ARN>" --certificate-arn "${certArn}" --output text | sed 's/t/n/g' > 
    cert.pem

    1. Export the certificate in pkcs12 format.
    openssl pkcs12 -export -in cert.pem -inkey key.pem -name ssl-configurator 
    -password pass: -out admin.p12

    Create client TLS certificate

    1. Generate CSR and key
    openssl req -newkey rsa:2048 -keyout key.pem -out cert.csr -batch -nodes -subj 
    '/CN=client'

    1. Issue certificate using previously created private CA (replace <CA_ARN> with the ARN of the AWS Private CA that you created).
    certArn=$(aws acm-pca issue-certificate --region <region> 
    --certificate-authority-arn "<CA_ARN>" --csr fileb://cert.csr 
    --signing-algorithm 'SHA256WITHRSA' --validity Value=180,Type='DAYS' --query 
    'CertificateArn' --output text)

    1. Get certificate ARN issue in the previous step.
    aws acm-pca get-certificate --region <region> --certificate-authority-arn 
    "<CA_ARN>" --certificate-arn "${certArn}" --output text | sed 's/t/n/g' > 
    cert.pem

    1. Export the certificate in pkcs12 format.
    openssl pkcs12 -export -in cert.pem -inkey key.pem -name ssl-configurator 
    -password pass: -out client.p12

    Setup a client machine to interact with the MSK cluster

    1. Create an Amazon EC2 instance to use as a client machine. For simplicity, create this instance in the same VPC you used for the cluster. See Step 3: Create a client machine for an example of how to create such a client machine.
    2. Copy previously created certificates admin.p12 and client.p12 into the client machine.
    3. Install java8+ on the client machine.
    4. Download Kafka binaries and extract
    https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
    1. Create admin and client configuration files for authentication  and authorization.
    cat <<EOF> admin.propertie
    bootstrap.servers="<BOOTSTRAP_URL>"
    security.protocol=SSL
    ssl.keystore.location=./admin.p12
    ssl.keystore.type=PKCS12
    ssl.keystore.password=
    EOF
    cat <<EOF> client.propertie
    bootstrap.servers="<BOOTSTRAP_URL>"
    security.protocol=SSL
    ssl.keystore.location=./client.p12
    ssl.keystore.type=PKCS12
    ssl.keystore.password=
    EOF

    Test Authentication and Authorization using ACLs

    Create Admin ACLs for granting admin access to clusters, topics, and groups

    By default, the MSK cluster will allow everyone if no ACL is found. Here the Admin ACL will be the first ACL. The Admin user (“User:CN=admin”) will leverage on Admin ACL to grant permissions to Client User(“User:CN=client”).

    ACL for managing cluster operations (Admin ACL).
    ./kafka_2.13-3.5.0/bin/kafka-acls.sh 
    --add 
    --allow-principal "User:CN=admin" 
    --operation All 
    --cluster 
    --bootstrap-server "<BOOTSTRAP_URL>" 
    --command-config admin.properties

    ACL for managing topics permissions (Admin ACL).
    ./kafka_2.13-3.5.0/bin/kafka-acls.sh 
    --add 
    --allow-principal "User:CN=admin" 
    --operation All 
    --topic "*" 
    --bootstrap-server "<BOOTSTRAP_URL>" 
    --command-config admin.properties

    ACL for managing group permissions (Admin ACL).
    ./kafka_2.13-3.5.0/bin/kafka-acls.sh 
    --add 
    --allow-principal "User:CN=admin" 
    --operation All 
    --group "*" 
    --bootstrap-server "<BOOTSTRAP_URL>" 
    --command-config admin.properties

    Create a topic.

    ./kafka_2.13-3.5.0/bin/kafka-topics.sh --bootstrap-server "<BOOTSTRAP_URL>" 
    --create --topic test-topic --command-config admin.properties

    List topic and check the topic is created.

    ./kafka_2.13-3.5.0/bin/kafka-topics.sh --bootstrap-server "<BOOTSTRAP_URL>" 
    --list --command-config admin.properties

    Grant write permission to the topic so that client (producer) can publish messages to the topic (Use admin user for granting access to client).

    ./kafka_2.13-3.5.0/bin/kafka-acls.sh 
    --add 
    --allow-principal "User:CN=client" 
    --operation Write 
    --topic "test-topic" 
    --bootstrap-server "<BOOTSTRAP_URL>" 
    --command-config admin.properties

    Publish messages to the topic using client user.

    for x in {1..10}; do echo "message $x"; done | 
    ./kafka_2.13-3.5.0/bin/kafka-console-producer.sh --bootstrap-server 
    "<BOOTSTRAP_URL>" --producer.config kafka-admin-client.properties --topic 
    test-topic --producer-property enable.idempotence=false

    Consume messages.

    Note: If you try to consume messages from the topic using a consumer group, you will get a group authorization error since the client user is not authorized to access groups.

    ./kafka_2.13-3.5.0/bin/kafka-console-consumer.sh --bootstrap-server 
    "<BOOTSTRAP_URL>" --topic test-topic --max-messages 2 --consumer-property 
    enable.auto.commit=false --consumer-property group.id=consumer-test 
    --from-beginning --consumer.config client.properties

    Grant group permission to the client user.

    ./kafka_2.13-3.5.0/bin/kafka-acls.sh 
    --add 
    --allow-principal "User:CN=client" 
    --operation Read 
    --resource-pattern-type prefixed 
    --group 'consumer-' 
    --bootstrap-server "<BOOTSTRAP_URL>" 
    --command-config admin.properties

    After providing group access, the client user should be able to consume messages from the topic using a consumer group.

    This way, you can manage client access to the topics and groups.

    Additional Commands 

    List ACL.
    ./kafka_2.13-3.5.0/bin/kafka-acls.sh 
    --bootstrap-server "<BOOTSTRAP_URL>" 
    --list 
    --command-config admin.properties

    Delete ACL (Create and delete ACL commands are same except –aad/–remove argument).
    ./kafka_2.13-3.5.0/bin/kafka-acls.sh 
    --remove 
    --allow-principal "User:CN=admin" 
    --operation All 
    --cluster 
    --bootstrap-server "<BOOTSTRAP_URL>" 
    --command-config admin.properties

    Conclusion

    AWS MSK eases the effort of managing independently hosted Kafka clusters. Users can scale Kafka brokers and storage as necessary. MSK supports TLS encryption and allows users to create TLS connections from the application to Amazon MSK brokers and ZooKeeper nodes with the help of the AWS Private CA, which enables users to create certificates for authentication.