整体架构如下:
日志--Watchdog--(Kafka)-->Flink--后端springboot--(WebSocket)--> 前端(Vue + ECharts)
|
Zookeeper
配置3台虚拟机,下面是3台电脑的配置
主机名 | ip地址 | broker.id | myid |
---|---|---|---|
node1 | 192.168.187.121 | 1 | 1 |
node2 | 192.168.187.122 | 2 | 2 |
node3 | 192.168.187.123 | 9 | 9 |
安装包下载链接 http://kafka.apache.org/downloads.html
下载所需要的版本,该设计使用的是kafka_2.11-2.0.0
将下载的kafka安装包通过XShell,FinalShell等Liunx连接工具,将安装包放入一个指定目录即可,演示文档通过放在将安装包放在/home/semir/packages,后面的工具都是如此
cd到指定目录
cd /home/hadoop/apps
解压到/home/semir/software下,可以在指定目录下看见kafka
tar -zxvf kafka_2.11-2.0.0.tgz -C /home/hadoop/apps
修改配置
cd /home/semir/software/kafka_2.12-2.4.1/config
vim server.properties
#broker的全局唯一编号,不能重复 | broker.id=0 |
---|---|
#删除topic功能使能 | delete.topic.enable=true |
#自动创建topic,false:生产者发送信息到已存在topic才没有报错 | auto.create.topics.enable = false |
#处理网络请求的线程数量 | num.network.threads=3 |
#用来处理磁盘IO的现成数量 | num.io.threads=8 |
#发送套接字的缓冲区大小 | socket.send.buffer.bytes=102400 |
#接收套接字的缓冲区大小 | socket.receive.buffer.bytes=102400 |
#请求套接字的缓冲区大小 | socket.request.max.bytes=104857600 |
#kafka运行日志存放的路径 | log.dirs=/opt/module/kafka/logs |
#topic在当前broker上的分区个数 | num.partitions=3 |
#用来恢复和清理data下数据的线程数量 | num.recovery.threads.per.data.dir=1 |
#配置连接 Zookeeper 集群地址 | zookeeper.connect=() |
如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion
其几两台服务器相同操作,broker.id在集群中要唯一
配置文件如下,因为是集群配置,broke.id不一样即可,下面其余配置看对应需求选择修改
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/tmp/kafka/kafka-logs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
2.1.5安装ZooKeeper
首先,进入Zookeeper官网(https://zookeeper.apache.org/),下载你所需要的ZooKeeper版本,本设计采用的是3.6.3版本
cd到指定目录
cd /home/hadoop/apps
解压到/home/semir/software下,可以在指定目录下看见kafka
tar -zxvf zookeeper-3.4.5.tar.gz -C /home/hadoop/apps
zookeeper有点不一样,先将配置文件模板复制一份
cp zoo_sample.cfg zoo.cfg
修改配置
cd /home/semir/software/zookeeper-3.4.5/conf/
vim zoo.cfg
修改配置文件Zookeeper.properties
# zookeeper时间配置中的基本单位(毫秒) | tickTime=2000 |
---|---|
# 允许follower初始化连接到leader最大时长,它表示tickTime时间的倍数 即: | initLimit*tickTime initLimit=10 |
# 运行follower与leader数据同步最大时长,它表示tickTime时间倍数 | syncLimit*tickTime syncLimit=5 |
# zookeeper数据存储目录及日志保存目录(如果没有指明dataLogDir,则日志也保存在这个文件中) | dataDir=/tmp/zookeeper |
# 对客户端提供的端口号 | clientPort=2181 |
# 单个客户端于zookeeper最大并发连接数 | maxClientCnxns=60 |
# 保存的数据快照数量,之外的将会被清除 | autopurge.snapRetainCount=3 |
# 自动出发清除任务时间间隔,以小时为单位。默认为0,表示不自动清除 | autopurge.purgeInterval=1 |
#metricsProvider.httpPort=7000 | #metricsProvider.exportJvmInfo=true |
## ttl settings | extendedTypesEnabled=true |
配置文件如下:将三台机器都按照这种方式配置即可
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/data/zookeeper/tmp
# the port at which the clients will connect
clientPort=2181
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=0.0.0.0:2888:3888
server.2=192.168.1.182:2888:3888
server.3=192.168.1.189:2888:3888
启动命令,因为Kafka的启动依赖Zookeeper,我们先启动Zookeeper
cd /home/semir/software/zookeeper/bin/
sh zkServer.sh start
使用jps查看或者sh zkServer.sh status启动是否成功
三台Zookeeper启动成功后,启动Kafka,使用nohup将服务运行在后台,不会因为当前窗口的关闭而关闭服务,输出的日志文件将追加输出到nohup.out。
nohup kafka-server-start.sh /home/semir/software/kafka_2.12-2.4.1/config/server.properties &
jps #查看Kafka是否启动
服务器: HA181(Master)
服务器: HA182(Slave)
服务器: HA183(Slave)
http://flink.apache.org/downloads.html
cd到指定目录
cd /home/semir/packages
解压到/home/semir/software下,可以在指定目录下看见kafka
tar -zxvf flink-1.12.0 -C /home/semir/software/
修改配置
cd /home/semir/software/flink-1.12.0/conf/
vim flink-conf.yaml
集群配置如下:
jobmanager.rpc.address: 192.168.1.181
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: region
指定workers
vim workers
配置如下:
192.168.1.182
192.168.1.189
指定master
vim masters
配置如下:
192.168.1.181
启动Flink集群
bin/start-cluster.sh
通过jps查看进程信息
--------------------- node1 ----------------
86583 Jps
85963 StandaloneSessionClusterEntrypoint
86446 TaskManagerRunner
--------------------- node2 ----------------
44099 Jps
43819 TaskManagerRunner
--------------------- node3 ----------------
29461 TaskManagerRunner
29678 Jps
浏览Flink Web UI界面
[http://192.168.1.181:8081]
启动:./bin/start-cluster.sh
停止:./bin/stop-cluster.sh
启动:./bin/start-cluster.sh
停止:./bin/stop-cluster.sh
bin/jobmanager.sh start
bin/jobmanager.sh stop
bin/taskmanager.sh start
bin/taskmanager.sh stop
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。