掘金 后端 ( ) • 2024-04-18 14:01

什么是kafka?

Kafka是由Apache软件基金会开发的开源流式数据平台。它最初由LinkedIn开发,用于处理大规模的实时数据流。Kafka被设计为高可靠、高吞吐量的分布式消息系统,用于发布、订阅流式数据。它具有持久性、水平扩展性和容错能力,能够处理数以千计的消息并支持多个消费者。

Kafka的核心概念包括以下几个部分:

  1. Producer(生产者) :负责将数据发布到Kafka的主题(topic)。
  2. Broker(代理) :Kafka集群中的服务器节点,负责存储数据和处理数据流。
  3. Topic(主题) :数据流的类别,数据被发布到主题并由消费者订阅。
  4. Consumer(消费者) :从Kafka主题中读取消息的应用程序。
  5. ZooKeeper:Kafka使用ZooKeeper来管理和协调Kafka broker。

Kafka被广泛应用于日志聚合、数据管道、事件驱动架构等场景,许多大型互联网公司和企业都在生产环境中使用Kafka来处理实时数据流

kafka发送数据的多方式

Kafka生产者发送数据到Kafka集群有多种方式,其中常见的包括:

  1. 同步发送:在同步发送中,生产者发送消息并等待直到消息被成功写入至少一个副本后才继续。这种方式可以确保消息不会丢失,但会增加延迟,因为生产者需要等待确认。
  2. 异步发送:在异步发送中,生产者发送消息后继续执行而不等待确认。这种方式可以提高性能,但可能会导致消息丢失,因为生产者在确认消息是否成功写入之前不会知道。
  3. 批量发送:生产者可以将多个消息打包成批次进行发送,以减少网络开销和提高吞吐量。批量发送可以通过配置生产者的参数来实现。
  4. 带回调函数的发送:生产者可以通过指定回调函数来处理发送结果。当消息成功发送或发送失败时,回调函数会被调用,可以用于处理成功发送的消息或处理发送失败的消息。
  5. 分区器自定义:生产者可以通过自定义分区器来控制消息发送到哪个分区。通过自定义分区器,可以实现特定的消息路由逻辑,以满足业务需求。

这些方式可以根据具体的业务需求和性能要求来选择和组合使用,以实现高效可靠的数据发送到Kafka集群。

安装MySql

手动下载MySql

自动安装mysql

brew install mysql

配置mysql环境变量(.bash_profile)

export PATH=$PATH:/usr/locol/mysql/bin

export PATH=$PATH:/usr/local/mysql/support-files

查看mysql的版本,打开终端,输入

mysql --version 

初始化 MySQL 数据库

mysql_secure_installation

启动 MySQL 服务

brew services start mysql

登录到MySQL服务器

mysql -u root -p

修改数据库用户密码

mysqladmin -u 这里输入用户名 -p password 这里输入数据库密码

安装Navicat Premium

Navicat Premium 是一款功能强大的数据库管理工具,它提供了一个集成化的环境,用于在多种数据库系统中进行数据库开发、管理和维护。Navicat Premium 支持主流的数据库系统,包括 MySQL、MariaDB、Oracle、SQL Server、PostgreSQL 等。

例如,创建一个名为“user_database”的数据库,并创建了两张表,分别是“contact_table”,“login_table”


Python建立MySql数据库连接

在 Python 环境中安装名为 "mysql-connector-python" 的软件包。这个软件包是用于在 Python 中连接和操作 MySQL 数据库的官方驱动程序

pip install mysql-connector-python

pymysql 是一个用于在 Python 中连接和操作 MySQL 数据库的库。它是一个纯 Python 实现的 MySQL 客户端,提供了简单而强大的接口,使得在 Python 中进行数据库操作变得更加方便

pip install pymysql

还需要安装一下 cryptography包,因为 python连接数据库是的加密方式需要 cryptography包。如果你不安装cryptography,python连接数据库会报错

pip install cryptography

执行python代码

import pymysql

try:
    # 建立MySQL连接
    connection = pymysql.connect(
        host='localhost',  # 数据库主机地址
        port=3306,  # 数据库端口
        user='root',  # 数据库用户名
        password='这里填入先前自己设定的密码',  # 数据库密码
        database='user_database',  # 数据库名称
        charset='utf8mb4'  # 数据库字符集
    )
    # 定义插入数据的SQL语句
    insert_query = """
    INSERT INTO contact_table
    (agent_name, mobile, sex, email, whats_app, hide_mobile, agent_num, company_name, company_num, company_address, contact_check, add_time)
    VALUES
    ('John Doe', '1234567890', 'Male', '[email protected]', '1234567890', 0, 123, 'ABC Company', 456, '123 Main St', 'check', NOW())
    """
    # 创建游标对象并执行SQL语句
    cursor = connection.cursor()
    cursor.execute(insert_query)
    # 提交事务
    connection.commit()
    print("数据插入成功")
except Exception as e:
    print("连接MySQL数据库失败:", e)

插入成功后,可以通过数据库查看到插入的数据

安装 zookeeper

安装kafka需要安装zookeeper,因为Kafka依赖于Zookeeper来管理其分布式节点,并存储元数据以确保一致性

然后进入 Zookeeper 的配置目录,一般是 conf 文件夹,复制 zoo_sample.cfg 并重命名为 zoo.cfg

其中,zoo.cfg配置文件中各配置项的含义如下所示:

# zookeeper时间配置中的基本单位(毫秒)
tickTime=2000

# 允许follower初始化连接到leader最大时长,它表示tickTime时间的倍数 即:initLimit*tickTime
initLimit=10

# 运行follower与leader数据同步最大时长,它表示tickTime时间倍数 即:syncLimit*tickTime
syncLimit=5

# zookeeper数据存储目录及日志保存目录(如果没有指明dataLogDir,则日志也保存在这个文件中)
dataDir=/tmp/zookeeper

# 对客户端提供的端口号
clientPort=2181

# 单个客户端于zookeeper最大并发连接数
maxClientCnxns=60

# 保存的数据快照数量,之外的将会被清除
autopurge.snapRetainCount=3

# 自动出发清除任务时间间隔,以小时为单位。默认为0,表示不自动清除
autopurge.purgeInterval=1

## Metrics Providers
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

## ttl settings
extendedTypesEnabled=true

## 由于AdminServer默认使用8080端口,此处修改为8888
admin.serverPort=8888

配置好zoo.conf配置文件后,执行以下命令启动ZooKeeper

cd /Users/xxx/Desktop/apache-zookeeper-3.9.2-bin/bin

./zkServer.sh start

启动CLI

./zkCli.sh

停止Zookeeper服务器

./zkServer.sh stop

安装 kafka

  • 首先,你需要从 Apache Kafka 的官方网站下载 Kafka 的压缩包

  • 解压下载的压缩包到本地路径,然后进入config目录下,编辑server.properties配置文件
属性 默认值 描述 broker.id 0 每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的名字,你可以选择任意数字作为id,但是一定要保证唯一性; log.dirs /tmp/kafka-logs kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间使用逗号分隔;每当创建新的partition时,都会选择在包含最少partitions的路径下进行; listeners PLAINTEXT://192.168.65.60:9092 server接受客户端连接的端口,ip配置kafka本机ip即可 zookeeper.connect localhost:2181 zookeeper连接字符串的格式为:hostname:port,此处分别对应zk集群中的节点;连接方式为:hostname1:port1,hostname2:port2,hostname3:port3 log.retention.hours 168 每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样 num.partitions 1 创建topic的默认分区数 default.replication.factor 1 自动创建topic的默认副本数量,建立设置为大于等于2 min.insync.replicas 1 当producer设置acks=-1时,min.insync.replicas指定的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常 delete.topic.enable false 是否允许删除主题

在以上的配置项中,我们最主要需要关注如下几个配置内容

【broker的序号】broker.id=0
【当前kafka的监听地址】listeners=PLAINTEXT://localhost:9092
【日志的存储路径】log.dirs=/Users/muse/kafka_2.13-3.0.0/kafka-logs
【zookeeper的服务地址】zookeeper.connect=localhost:2181

  • 找到项目根目录,运行以下命令,启动 Kafka Broker
cd /Users/xxx/Desktop/kafka-3.7.0-src/bin
  
./kafka-server-start.sh ../config/server.properties

停止kafak服务器

./kafka-server-stop.sh  ../config/server.properties

安装EFAK(原名 Kafka Eagle)

EAFK 是一个开源的 Kafka 集群管理和监控工具,旨在帮助用户更好地管理和监控其 Kafka 集群。

  • 实时监控: 实时监控 Kafka 集群的状态、健康状况以及性能指标。
  • 消费者组管理: 查看和管理消费者组、消费者、消费者偏移等信息。
  • Topic 管理: 创建、修改、删除 Kafka Topic,并查看 Topic 详细信息。
  • 告警系统: 支持配置告警规则,及时发现集群问题并通知管理员。
  • 图表和报表: 提供可视化的图表和报表,帮助用户更好地理解集群情况。
  • 用户权限: 支持多用户和角色权限管理,确保安全性。
  • 易于部署: 提供简单的安装和部署流程,适用于各种规模的 Kafka 集群。
  • 跨平台: 支持 Linux、Windows、Mac OS X 等多种平台。
  • 跨版本: 支持 Kafka KRaft 模式。

创建mysql数据库ke 用来储存元数据

修改EFAK的conf目录下配置文件——system-config.properties

efak.username=root
efak.password=填写mysql的密码

配置环境变量

export KE_HOME=/Users/xxx/Desktop/kafka-eagle-bin-3.0.1/efak-web-3.0.1
export PATH=$PATH:$KE_HOME/bin

单机版启动EFAK

cd /Users/xxx/Desktop/kafka-eagle-bin-3.0.1/efak-web-3.0.1/bin

./ke.sh start

集群方式启动

./ke.sh cluster start
./ke.sh cluster restart

启动成功界面,如下图所示:

在浏览器中输入 http://127.0.0.1:8048

然后输入账号密码:admin / 123456,进行登陆

EFAK常用命令

命令 描述 ke.sh start 启动EFAK服务器 ke.sh status 查看EFAK运行状态 ke.sh stop 停止EFAK服务器 ke.sh restart 重新启动EFAK服务器 ke.sh stats 查看linux操作系统中的EFAK句柄数 ke.sh cluster start 查看EFAK集群分布式启动

用python写一个kafka的示例

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError

# 主题
topic = 'test_topic'
# 创建一个生产者对象
producer = KafkaProducer(bootstrap_servers=['xxx.xxx.x.xx:xxxx'])
for i in range(20):
    # 发送消息
    message = f'消息内容 i = {i}'
    future = producer.send(topic, bytes(message, 'utf-8'))
    try:
        # 等待消息发送完成,并打印出消息的分区和偏移量
        record_metadata = future.get(timeout=10)
        print(f'{message} 发送到分区 {record_metadata.partition} 偏移量 {record_metadata.offset}')
    except KafkaError as e:
        print('发送消息失败, {}: {}'.format(message, e))
# 创建一个消费者对象
consumer = KafkaConsumer(topic, bootstrap_servers=['xxx.xxx.x.xx:xxxx'], auto_offset_reset='earliest',
                         enable_auto_commit=True, group_id='my-group', max_poll_records=10)

while True:
    # 从Kafka集群中拉取消息
    messages = consumer.poll(timeout_ms=1000)
    if not messages:
        continue
    for topic_partition, records in messages.items():
        for record in records:
            print(f"从Kafka集群中拉取消息 = {record.value.decode('utf-8')}")

# See PyCharm help at https://www.jetbrains.com/help/pycharm/