Kafka-101

Kafka-101

Apache Kafka is a distributed event streaming platform that is used for building real-time data pipelines and streaming applications. It is designed to handle high-throughput, fault-tolerant, and scalable data processing. Kafka is widely used in various industries to manage and process large streams of data in real time.

Here's a comprehensive breakdown of Kafka:

Overview

  • Origin: Kafka was originally developed by LinkedIn and open-sourced in 2011. It is now maintained by the Apache Software Foundation.

  • Core Concept: At its core, Kafka is a distributed system that allows applications to publish and subscribe to streams of records in a fault-tolerant manner.

Key Concepts

  1. Producers: Applications that publish data to Kafka topics. Producers send data records to topics in the Kafka cluster.

  2. Consumers: Applications that subscribe to topics and process the data. Consumers read data from Kafka topics.

  3. Topics: A category or feed name to which records are published. Topics are partitioned, and each partition is an ordered, immutable sequence of records.

  4. Partitions: Each topic is split into partitions, which are the basic unit of parallelism in Kafka. Partitions enable Kafka to scale horizontally by distributing data across multiple brokers.

  5. Brokers: Kafka runs as a cluster of one or more servers, each of which is called a broker. Brokers manage data storage and retrieval for the partitions.

  6. Clusters: A Kafka cluster is a collection of brokers that work together to manage topics, partitions, and data replication.

  7. Replication: Kafka replicates data across multiple brokers to ensure fault tolerance. Each partition has a configurable number of replicas, ensuring data durability and availability.

  8. Zookeeper: Originally used by Kafka to manage cluster metadata, coordinate brokers, and track partitions. Kafka has been moving towards replacing Zookeeper with its own internal metadata management system called KRaft.

How Kafka Works

  • Data Flow: Producers publish messages to a specific topic, and these messages are stored in partitions within the topic. Each message is assigned an offset, which is a unique identifier within the partition.

  • Offset: Consumers read messages by their offset, allowing them to process data at their own pace and track their progress.

  • Log-Based Storage: Kafka stores data in a log format, where each partition is an immutable sequence of records, and new data is appended to the end of the log.

  • Retention: Kafka retains data for a configurable period, allowing consumers to replay data as needed.

Use Cases

  1. Real-Time Analytics: Kafka is used for processing and analyzing data streams in real time, such as monitoring user activities, tracking system metrics, and processing IoT sensor data.

  2. Data Integration: Kafka acts as a central hub for integrating data from various sources, including databases, applications, and external systems, into a unified stream for further processing.

  3. Log Aggregation: Kafka collects and centralizes log data from multiple systems, making it easier to analyze and monitor logs across a distributed environment.

  4. Event Sourcing: Kafka is used in event-driven architectures to capture changes in application state as a sequence of events, allowing for reliable state reconstruction.

  5. Message Queueing: Kafka serves as a robust message queue, providing durable storage and high-throughput message delivery between producers and consumers.

Benefits

  • Scalability: Kafka can scale horizontally by adding more brokers and partitions to handle increased data volume and throughput.

  • Fault Tolerance: Kafka's replication mechanism ensures data durability and availability, even in the event of broker failures.

  • High Throughput: Kafka is optimized for high-throughput and low-latency data streaming, making it suitable for handling large volumes of data.

  • Durability: Kafka's log-based storage ensures data persistence and allows consumers to replay messages as needed.

Kafka Cluster installation on Kubernetes and Monitoring with Prometheus Grafana

Pre-requisites:

  • A Kubernetes Cluster

  • Helm Installed

  • Longhorn Installed on the Cluster

A Kubernetes Cluster running locally setup using Kubespray

We will use Helm to install nearly all the components for this blog.

Let us get started , we will install LongHorn as the first component ,

# Install Longhorn via Helm
helm repo add longhorn https://charts.longhorn.io
helm repo update
kubectl create namespace longhorn-system
helm install longhorn longhorn/longhorn --namespace longhorn-system

Step 2: Edit the Longhorn Service

Edit the longhorn-frontend service to change its type to NodePort.

  1. Edit the Service

    Use kubectl to edit the service:

kubectl edit svc longhorn-frontend -n longhorn-system

Modify the Service Type

In the editor that opens, find the spec section and change the type from ClusterIP (default) to NodePort. Optionally, specify a nodePort to set a fixed port, or leave it empty to let Kubernetes automatically assign one.

Here's how you should modify the spec section:

spec:
  type: NodePort
  ports:
  - port: 80
    targetPort: 80
    protocol: TCP
    nodePort: 30080 # Optional: Specify a node port here

Longhorn is now accessible via the worker node IPs

http://192.168.1.61:30767/#/node

Step 1: Install the Strimzi Operator

  1. Add Strimzi Helm Repository

    Add the Strimzi Helm repository to your local Helm installation:

     helm repo add strimzi https://strimzi.io/charts/
    
  2. Update Helm Repositories

    Update your Helm repositories to ensure you have the latest version of the Strimzi chart:

     helm repo update
    
  3. Install Strimzi Operator

    Use Helm to install the Strimzi Operator in a dedicated namespace (e.g., kafka):

     kubectl create namespace kafka
     helm install strimzi strimzi/strimzi-kafka-operator --namespace kafka
    
  4. Verify Installation

    Check that the Strimzi Operator pods are running:

     kubectl get pods -n kafka
    

    You should see the operator pods with the Running status.

Step 2: Deploy a Kafka Cluster with Longhorn Storage

Now that the Strimzi Operator is installed, you can deploy a Kafka cluster with Longhorn as the storage backend.

  1. Create Kafka Cluster YAML

    Create a file named kafka-cluster.yaml with the following configuration, which uses Longhorn for persistent storage:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
spec:
  kafka:
    version: 3.7.1
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.7"
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 10Gi
          deleteClaim: false
          class: longhorn
    metricsConfig:
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: kafka-metrics
          key: kafka-metrics-config.yml
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: false
      class: longhorn
  entityOperator:
    topicOperator: {}
    userOperator: {}
  • Make sure that the class field under storage is set to longhorn, which matches the Longhorn StorageClass.

  • Apply the Kafka Configuration

    Deploy the Kafka cluster using the configuration file:

      kubectl apply -f kafka-cluster.yaml
    
  • Verify the Deployment

    Check the status of the Kafka cluster deployment:

      kubectl get kafka -n kafka
    

    You should see the Kafka cluster with the desired number of replicas running.

  • Verify the Persistent Volumes

    Check that the persistent volumes are created and bound using Longhorn:

      kubectl get pvc -n kafka
    

Once the Cluster is up n running we are going to create a topic

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 1
  replicas: 1
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
  • apiVersion: kafka.strimzi.io/v1beta2 specifies the Strimzi version for the Kafka topic resource.

  • kind: KafkaTopic indicates that this is a Kafka topic resource managed by Strimzi.

  • metadata:

    • name: my-topic is the name of the Kafka topic.

    • labels: The label strimzi.io/cluster: my-cluster ties this topic to the Kafka cluster named my-cluster.

  • spec:

    • partitions: 1 specifies that the topic will have a single partition.

    • replicas: 1 means that the topic will have one replica per partition. This is appropriate for a development or testing environment but might need adjustment for production use.

    • config:

      • retention.ms: 7200000 (2 hours) is the retention time for messages in milliseconds. Adjust this based on your data retention needs.

      • segment.bytes: 1073741824 (1 GiB) is the segment file size in bytes. This is how large each segment file for the topic will be.

Apply the Configuration

kubectl apply -f kafka-topic.yaml

To produce events (messages) to your Kafka topic, this is what we are going to do :

kafka-console-producer --broker-list my-cluster-kafka-bootstrap:9092 --topic my-topic

Command Breakdown

  • kafka-console-producer: This is a command-line tool that comes with Apache Kafka. It's used to send messages to a Kafka topic.

  • --broker-list my-cluster-kafka-bootstrap:9092: This option specifies the address of the Kafka broker(s) to which you want to send messages. In this example:

    • my-cluster-kafka-bootstrap: This is the service name of the Kafka broker in a Kubernetes environment managed by Strimzi. It acts as a load balancer to connect to one of the Kafka brokers in your cluster.

    • :9092: This is the default port number on which the Kafka broker listens for incoming messages. Ensure this port is accessible from where you are running the producer.

  • --topic my-topic: This option specifies the name of the Kafka topic to which you want to send messages. In this example, the topic is named my-topic.

What Happens When You Run This Command

  1. Connection to Kafka Broker: The console producer connects to the Kafka broker specified in the --broker-list argument.

  2. Prompt for Input: After running the command, you will be prompted to enter messages in the terminal.

  3. Message Production: Each line you type in the terminal and press Enter is sent as a message to the specified Kafka topic (my-topic).

  4. Exit the Producer: To exit the producer, you can typically use Ctrl + C to stop the process.

Use Case

  • Testing and Debugging: The Kafka Console Producer is often used for testing and debugging Kafka topics to ensure they are correctly configured and can receive messages.

  • Quick Message Production: It provides a quick way to produce messages to a Kafka topic without writing any code.

Hello, Kafka!

After pressing Enter, the message "Hello, Kafka!" is sent to the my-topic Kafka topic.

This tool is very useful in development and testing scenarios to verify that your Kafka setup is working correctly and to see how messages flow through your Kafka infrastructure. If you want to automate message production or integrate it with applications.

Off-Topic:

I had an issue where my "my-clustert-kafka" pods were stuck in creating mode, after doing some digging with events and logs , i realized my storage class doesn't have enough space for these pods to spin up

NAME                       READY   STATUS    RESTARTS   AGE
my-cluster-kafka-0         1/1     ContainerCreating   0          5m
my-cluster-kafka-1         1/1     ContainerCreating   0          5m
my-cluster-kafka-2         1/1     ContainerCreating   0          5m

I had to add a new disk to both my nodes on LongHorn to avail mre space for these pods to run :
These are the steps i did:

Step-by-Step Guide to Configure New Disks on Ubuntu 20.04

Step 1: Identify the New Disks

  1. List Available Disks: Use the lsblk command to list all available block devices and identify your new disk(s).

     bashCopy codelsblk
    

    Look for the disk(s) without a mount point. They might be named something like /dev/sdb, /dev/sdc, etc.

  2. Check Disk Details: Verify the details of the new disk using the fdisk command.

     bashCopy codesudo fdisk -l /dev/sdb
    

Step 2: Partition the New Disk (if needed)

  1. Partition the Disk: Use fdisk or parted to create a partition on the new disk.

     bashCopy codesudo fdisk /dev/sdb
    

    Follow these steps within fdisk:

    • Type n to create a new partition.

    • Press Enter to accept the default values for partition type and number.

    • Type w to write the changes.

  2. Verify Partition: List the partitions to verify:

     lsblk
    

Step 3: Format the New Partition

  1. Format the Partition: Format the new partition with a filesystem, typically ext4.

     sudo mkfs.ext4 /dev/sdb1
    
  2. Label the Partition (Optional): You can assign a label to the partition for easy identification.

     sudo e2label /dev/sdb1 data-disk
    

Step 4: Mount the New Disk

  1. Create a Mount Point: Create a directory where you want to mount the new disk.

     sudo mkdir /mnt/data-disk
    
  2. Mount the Disk: Mount the disk to the directory you created.

     sudo mount /dev/sdb1 /mnt/data-disk
    
  3. Verify the Mount: Check if the disk is mounted correctly.

     df -h
    

Step 5: Update /etc/fstab for Persistent Mounting

  1. Get UUID of the Disk: Obtain the UUID of the new disk partition.

     sudo blkid /dev/sdb1
    
  2. Edit /etc/fstab: Open /etc/fstab in a text editor.

     sudo nano /etc/fstab
    
  3. Add Entry to /etc/fstab: Add an entry to mount the disk automatically at boot. Replace <UUID> with the UUID obtained earlier.

     UUID=<UUID> /mnt/data-disk ext4 defaults 0 2
    
  4. Test the /etc/fstab Configuration: Test the configuration by unmounting and then remounting all filesystems.

     sudo umount /mnt/data-disk
     sudo mount -a
    

    Ensure there are no errors.

Step 6: Configure Longhorn to Use the New Disk

  1. Access Longhorn UI: Open the Longhorn UI in your browser.

  1. Add Disk in Longhorn:

    • Navigate to the Node section.

    • Select the node where you added the disk.

    • Add the disk by specifying the mount path /mnt/data-disk.

  2. Verify Disk Configuration: Ensure the disk is schedulable and has the correct tags and settings.

Once this is complete you will be able to see your pods successfully running

setting up Prometheus and Grafana using the Prometheus Operator is a robust way to monitor your Kafka cluster. This method leverages Kubernetes custom resources to manage Prometheus and monitoring configurations. Here’s a concise walkthrough of the steps, incorporating your method:

Step 1: Set Up Prometheus Operator

  1. Download Prometheus Operator YAML Bundle:

     curl -s https://raw.githubusercontent.com/coreos/prometheus-operator/master/bundle.yaml > prometheus-operator-deployment.yaml
    
  2. Update Namespace:

    Modify the YAML file to use your observability namespace:

     sed -i -E '/[[:space:]]*namespace: [a-zA-Z0-9-]*$/s/namespace:[[:space:]]*[a-zA-Z0-9-]*$/namespace: observability/' prometheus-operator-deployment.yaml
    
  3. Deploy Prometheus Operator:

     kubectl -n observability apply -f prometheus-operator-deployment.yaml
    

Step 2: Create PodMonitor Resources for Kafka

  1. Create the PodMonitor YAML file (strimzi-pod-monitor.yaml):

     apiVersion: monitoring.coreos.com/v1
     kind: PodMonitor
     metadata:
       name: cluster-operator-metrics
       labels:
         app: strimzi
     spec:
       selector:
         matchLabels:
           strimzi.io/kind: cluster-operator
       namespaceSelector:
         matchNames:
           - kafka
       podMetricsEndpoints:
       - path: /metrics
         port: http
     ---
     apiVersion: monitoring.coreos.com/v1
     kind: PodMonitor
     metadata:
       name: entity-operator-metrics
       labels:
         app: strimzi
     spec:
       selector:
         matchLabels:
           app.kubernetes.io/name: entity-operator
       namespaceSelector:
         matchNames:
           - kafka
       podMetricsEndpoints:
       - path: /metrics
         port: healthcheck
     ---
     apiVersion: monitoring.coreos.com/v1
     kind: PodMonitor
     metadata:
       name: kafka-resources-metrics
       labels:
         app: strimzi
     spec:
       selector:
         matchExpressions:
           - key: "strimzi.io/kind"
             operator: In
             values: ["Kafka", "KafkaConnect", "KafkaMirrorMaker", "KafkaMirrorMaker2"]
       namespaceSelector:
         matchNames:
           - kafka
       podMetricsEndpoints:
       - path: /metrics
         port: tcp-prometheus
         relabelings:
         - separator: ;
           regex: __meta_kubernetes_pod_label_(strimzi_io_.+)
           replacement: $1
           action: labelmap
         - sourceLabels: [__meta_kubernetes_namespace]
           separator: ;
           regex: (.*)
           targetLabel: namespace
           replacement: $1
           action: replace
         - sourceLabels: [__meta_kubernetes_pod_name]
           separator: ;
           regex: (.*)
           targetLabel: kubernetes_pod_name
           replacement: $1
           action: replace
         - sourceLabels: [__meta_kubernetes_pod_node_name]
           separator: ;
           regex: (.*)
           targetLabel: node_name
           replacement: $1
           action: replace
         - sourceLabels: [__meta_kubernetes_pod_host_ip]
           separator: ;
           regex: (.*)
           targetLabel: node_ip
           replacement: $1
           action: replace
    
  2. Apply the PodMonitor Configurations:

     kubectl -n observability apply -f strimzi-pod-monitor.yaml
    

Step 3: Deploy Prometheus Server

  1. Create Prometheus Configuration YAML (prometheus.yaml):

     yamlCopy codeapiVersion: rbac.authorization.k8s.io/v1
     kind: ClusterRole
     metadata:
       name: prometheus-server
       labels:
         app: strimzi
     rules:
       - apiGroups: [""]
         resources:
           - nodes
           - nodes/proxy
           - services
           - endpoints
           - pods
         verbs: ["get", "list", "watch"]
       - apiGroups:
           - extensions
         resources:
           - ingresses
         verbs: ["get", "list", "watch"]
       - nonResourceURLs: ["/metrics"]
         verbs: ["get"]
     ---
     apiVersion: v1
     kind: ServiceAccount
     metadata:
       name: prometheus-server
       labels:
         app: strimzi
     ---
     apiVersion: rbac.authorization.k8s.io/v1
     kind: ClusterRoleBinding
     metadata:
       name: prometheus-server
       labels:
         app: strimzi
     roleRef:
       apiGroup: rbac.authorization.k8s.io
       kind: ClusterRole
       name: prometheus-server
     subjects:
       - kind: ServiceAccount
         name: prometheus-server
         namespace: observability
     ---
     apiVersion: monitoring.coreos.com/v1
     kind: Prometheus
     metadata:
       name: prometheus
       labels:
         app: strimzi
     spec:
       replicas: 1
       serviceAccountName: prometheus-server
       podMonitorSelector:
         matchLabels:
           app: strimzi
       serviceMonitorSelector: {}
       resources:
         requests:
           memory: 400Mi
       enableAdminAPI: false
    
  2. Apply the Prometheus Configuration:

     kubectl -n observability apply -f prometheus.yaml
    

Step 4: Configure Kafka for Metrics

  1. Update Kafka Configuration to Expose Metrics:

    Ensure your Kafka CRD includes the metricsConfig as described. Apply the Kafka CRD update:

     yamlCopy codemetricsConfig:
       type: jmxPrometheusExporter
       valueFrom:
         configMapKeyRef:
           name: kafka-metrics
           key: kafka-metrics-config.yml
    
     kubectl apply -f kafka-metrics.yaml -n kafka
    

Step 5: Deploy and Configure Grafana

  1. Create Grafana YAML (grafana.yaml):

     yamlCopy codeapiVersion: apps/v1
     kind: Deployment
     metadata:
       name: grafana
       namespace: observability
       labels:
         app: grafana
     spec:
       replicas: 1
       selector:
         matchLabels:
           app: grafana
       template:
         metadata:
           labels:
             app: grafana
         spec:
           containers:
             - name: grafana
               image: grafana/grafana:latest
               ports:
                 - containerPort: 3000
     ---
     apiVersion: v1
     kind: Service
     metadata:
       name: grafana
       namespace: observability
       labels:
         app: grafana
     spec:
       ports:
         - port: 80
           targetPort: 3000
       selector:
         app: grafana
    
  2. Apply the Grafana Configuration:

     kubectl -n observability apply -f grafana.yaml
    
  3. Access Grafana:

    Use port-forwarding if not exposed via a LoadBalancer or Ingress:

     bashCopy codekubectl port-forward svc/grafana 3000:80 -n observability
    

    Access Grafana at http://192.168.1.60:3000. The default credentials are admin/admin.

To configure your Kafka cluster to expose metrics for Prometheus using Strimzi, you'll need to modify the Kafka custom resource definition (CRD) to include the metricsConfig. This involves creating a ConfigMap with the necessary metrics configuration and updating the Kafka CRD to use this ConfigMap.

Here are the steps to achieve this:

Step 1: Create a ConfigMap for Metrics

First, create a ConfigMap that contains the jmxPrometheusExporter configuration. This configuration specifies which metrics to collect and how they should be formatted.

  1. Create the ConfigMap YAML file (kafka-metrics-config.yml):
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-metrics
  namespace: kafka
data:
  kafka-metrics-config.yml: |
    lowercaseOutputName: true
    rules:
    - pattern: "kafka.server<type=(.+), name=(.+)><>(Count)"
      name: "kafka_server_$1_$2_count"
      type: GAUGE
      labels:
        instance: "$0"
    # Add more rules here as needed
  1. Apply the ConfigMap to the Kubernetes cluster:
kubectl apply -f kafka-metrics-config.yml

Update the Kafka CRD

Modify your Kafka custom resource to include the metricsConfig section, referencing the ConfigMap you just created.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
spec:
  kafka:
    version: 3.7.1
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.7"
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 10Gi
          deleteClaim: false
          class: longhorn
    metricsConfig:
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: kafka-metrics
          key: kafka-metrics-config.yml
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: false
      class: longhorn
  entityOperator:
    topicOperator: {}
    userOperator: {}
  1. Apply the updated Kafka custom resource to the Kubernetes cluster:
kubectl apply -f kafka-cluster.yaml

Step 3: Verify the Configuration

After applying the configuration, the Strimzi operator should restart the Kafka pods with the new configuration. You can verify that the metrics are being exposed by checking the logs and metrics endpoints of your Kafka pods.

  1. Check the status of the Kafka pods:
kubectl get pods -n kafka
  1. Verify metrics exposure by port-forwarding or accessing the metrics endpoint directly:
kubectl port-forward my-cluster-kafka-0 9404:9404 -n kafka

we will need to add the endpoint to grafana to visualize how the kafka metrics