【Kafka】 快速开始

Metadata

title: 【Kafka】 快速开始
date: 2023-02-14 12:58
tags:
  - 行动阶段/完成
  - 主题场景/组件
  - 笔记空间/KnowladgeSpace/ProgramSpace/ModuleSpace
  - 细化主题/Module/Kafka
categories:
  - Kafka
keywords:
  - Kafka
description: 【Kafka】 快速开始

【Kafka】 快速开始

STEP 1: GET KAFKA

Download the latest Kafka release and extract it:

$ tar -xzf kafka_2.13-3.4.0.tgz
$ cd kafka_2.13-3.4.0

STEP 2: START THE KAFKA ENVIRONMENT

NOTE: Your local environment must have Java 8+ installed.

Apache Kafka可以使用ZooKeeper或KRaft启动。要开始使用任何一种配置,请遵循下面的一节,但不要同时遵循两节。

Kafka with ZooKeeper

执行以下命令,按照正确的顺序启动所有服务:

# Start the ZooKeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties

打开另一个终端会话并运行:

# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

一旦所有服务都成功启动,您就有了一个基本的Kafka环境,可以使用了。

Kafka with KRaft

生成集群UUID

$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

格式日志目录

$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

启动Kafka服务器

$ bin/kafka-server-start.sh config/kraft/server.properties

一旦Kafka服务器成功启动,您将有一个基本的Kafka环境运行并准备使用。

STEP 3: CREATE A TOPIC TO STORE YOUR EVENTS

Kafka是一个分布式的事件流平台,允许您在多台机器上读取、写入、存储和处理事件(在文档中也称为记录或消息)。

事件示例包括支付交易、移动电话的地理位置更新、运输订单、物联网设备或医疗设备的传感器测量等等。这些事件被组织并存储在主题中。非常简单,主题类似于文件系统中的文件夹,事件就是文件夹中的文件。

因此,在编写第一个事件之前,必须创建一个主题。打开另一个终端会话并运行:

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

Kafka的所有命令行工具都有额外的选项:运行不带任何参数的Kafka -topics.sh命令来显示使用信息。例如,它还可以显示新主题的分区计数等详细信息:

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: NPmZHyhbR9y00wMglMH2sg PartitionCount: 1       ReplicationFactor: 1	Configs:
    Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0

STEP 4: WRITE SOME EVENTS INTO THE TOPIC

Kafka客户端通过网络与Kafka代理通信以写入(或读取)事件。一旦接收到事件,代理将以持久和容错的方式存储事件,只要您需要—甚至永远。

运行控制台生成器客户端,在主题中写入一些事件。默认情况下,您输入的每一行都将导致将一个单独的事件写入主题。

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event

STEP 5: READ THE EVENTS

打开另一个终端会话,运行控制台消费者客户端来读取刚刚创建的事件:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

您可以随意尝试:例如,切换回您的生产者终端(上一步)来编写其他事件,并查看事件如何立即显示在您的消费者终端中。
因为事件被持久地存储在Kafka中,它们可以被你想读多少次,被多少个消费者读多少次。您可以通过打开另一个终端会话并再次重新运行前面的命令来轻松验证这一点。

STEP 6: IMPORT/EXPORT YOUR DATA AS STREAMS OF EVENTS WITH KAFKA CONNECT

您可能在现有系统(如关系数据库或传统消息传递系统)中有大量数据,以及许多已经使用这些系统的应用程序。Kafka Connect允许你不断地从外部系统摄取数据到Kafka,反之亦然。它是一个可扩展的工具,运行连接器,实现与外部系统交互的自定义逻辑。因此,将现有系统与Kafka集成起来非常容易。为了使这个过程更容易,有数百个这样的连接器可用。

在这个快速入门中,我们将看到如何使用简单的连接器来运行Kafka Connect,这些连接器可以将数据从文件导入到Kafka主题,并将数据从Kafka主题导出到文件。

首先,确保将connect-file- 3.0.0 .jar添加到插件中。Connect工作者配置中的path属性。出于快速入门的目的,我们将使用一个相对路径,并将连接器的包视为一个优步jar,当快速入门命令从安装目录运行时,它会工作。但是,值得注意的是,对于生产部署来说,使用绝对路径总是更可取的。看到插件。路径中详细说明了如何设置此配置。

编辑config/connect-standalone。属性文件,添加或更改插件。路径配置属性匹配如下,并保存文件:

> echo "plugin.path=libs/connect-file-3.4.0.jar"

然后,开始创建一些用于测试的种子数据:

> echo -e "foo\nbar" > test.txt

或者在Windows上:

> echo foo> test.txt
> echo bar>> test.txt

接下来,我们将启动两个以独立模式运行的连接器,这意味着它们在单个本地专用进程中运行。我们提供三个配置文件作为参数。首先是Kafka Connect进程的配置,包括要连接的Kafka代理和数据的序列化格式等常见配置。其余的配置文件每个都指定要创建的连接器。这些文件包括唯一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

Kafka附带的这些示例配置文件使用了您前面开始的默认本地集群配置,并创建了两个连接器:第一个是源连接器,从输入文件中读取行并将每一行生成到Kafka主题中;第二个是接收器连接器,从Kafka主题中读取消息并将每一行生成到输出文件中。

在启动期间,您将看到许多日志消息,包括一些指示连接器正在实例化的消息。一旦Kafka Connect进程启动,源连接器应该开始从test.txt中读取行并将它们生成到主题Connect -test中,而接收器连接器应该开始从主题Connect -test中读取消息并将它们写入文件test.sink.txt。我们可以通过检查输出文件的内容来验证数据已经通过整个管道交付:

> more test.sink.txt
foo
bar

注意,数据被存储在Kafka主题connect-test中,所以我们也可以运行一个控制台消费者来查看主题中的数据(或使用自定义消费者代码来处理它):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

连接器继续处理数据,所以我们可以将数据添加到文件中,并看到它通过管道移动:

> echo Another line>> test.txt

您应该看到这一行出现在控制台使用者输出和接收器文件中。

STEP 7: PROCESS YOUR EVENTS WITH KAFKA STREAMS

一旦数据以事件的形式存储在Kafka中,你就可以使用Java/Scala的Kafka Streams客户端库来处理数据。它允许您实现关键任务的实时应用程序和微服务,其中输入和/或输出数据存储在Kafka主题中。Kafka Streams结合了在客户端编写和部署标准Java和Scala应用程序的简便性,以及Kafka的服务器端集群技术的优点,使这些应用程序具有高度可扩展性、弹性、容错性和分布式。该库支持“只处理一次”、有状态操作和聚合、窗口、连接、基于事件时间的处理,等等。

为了给你一个初步的体验,下面是如何实现流行的WordCount算法:

KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

STEP 8: TERMINATE THE KAFKA ENVIRONMENT

现在您已经完成了快速入门,可以随意拆除Kafka环境,或者继续尝试。

  1. 使用Ctrl-C停止生产者和消费者客户机(如果还没有这样做的话)。
  2. 用Ctrl-C停止Kafka代理。
  3. 最后,如果遵循了Kafka with ZooKeeper部分,请使用Ctrl-C停止ZooKeeper服务器。

如果你还想删除你的本地Kafka环境的任何数据,包括你在这个过程中创建的任何事件,运行命令:

$ rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs