本部落格已搬遷, 3秒後跳轉...

Apache Kafka安裝測試 | Laplace's Lab

Apache Kafka安裝測試

紀錄前陣子嘗試建立Kafka平台模擬事件串流的過程。模擬的情境是,假設有人感染了COVID-19,然後我們要利用Kafka Producer發佈該事件通知,使Kafka Consumer接收到訊息並顯示出來。

What is Apache Kafka?

Apache Kafka官網

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

維基中文頁面

Kafka最初是由領英開發,由Scala和Java編寫,於2011年初開源。該專案的目標是為處理即時資料提供一個統一、高吞吐、低延遲的平台。

在Kafka中有幾個主要的概念:

Broker 實現資料儲存的主機伺服器
Producer 訊息的生產者
Consumer 訊息的消費者
Topic 訊息的分類
Partition Topic中的訊息會被分為若干Partition,以提高訊息處理效率

Installation

Get Started

移動到解壓縮後的Kafka資料夾根目錄

  • 啟動Zookeeper server

    1
    $ bin/zookeeper-server-start.sh config/zookeeper.properties
  • 啟動Kafka server

    1
    $ bin/kafka-server-start.sh config/server.properties

Apache Zookeeper是用來管理Kafka分散式叢集(Brokers)組態設定與其資源配置的服務。
*啟動server若顯示以下錯誤訊息:

1
2
3
Unrecognized VM option 'PrintGCDateStamps'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

參考stackoverflow相關討論,修改/bin/kafka-run-class.sh這個檔案:

1
if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]]

將上述的if條件式內容修改為
1
JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([^.-]*).*/\1/p')
  • 建立Topic
    以下指令會建立一個具有一個副本、一個分區的名稱為covid-19的Topic:

    1
    $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic covid-19
  • 確認已建立的Topic

    1
    $ bin/kafka-topics.sh --list --zookeeper localhost:2181
  • Producer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    #!usr/bin/env python3
    # coding:utf-8


    import time
    from kafka import KafkaProducer
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    counter = 1
    while True:
    msg = '[Total: ' + str(counter) + ']Someone is contracted COVID-19! Be careful!'
    msg = msg.encode('ascii')
    producer.send('covid-19', msg)
    time.sleep(5)
    counter += 1
  • Consumer

    1
    2
    3
    4
    5
    6
    7
    8
    #!usr/bin/env python3
    # coding:utf-8


    from kafka import KafkaConsumer
    consumer = KafkaConsumer('covid-19')
    for msg in consumer:
    print(msg.value.decode('ascii'))

可以看到Consumer都接收到了COVID-19事件通知⬇︎

Ref. Hello World In Kafka Using Python

0%