建站经验,中国制造网官方网站国际站,网址备案,网站加背景音乐文章目录 一、上传压缩包二、解压压缩包三、监控本地文件#xff08;file to kafka#xff09;3.1 编写配置文件3.2 自定义拦截器3.2.1 开发拦截器jar包#xff08;1#xff09;创建maven项目#xff08;2#xff09;开发拦截器类#xff08;3#xff09;开发pom文件file to kafka3.1 编写配置文件3.2 自定义拦截器3.2.1 开发拦截器jar包1创建maven项目2开发拦截器类3开发pom文件4打成jar包上传到Flume 3.2.3 修改配置文件 3.3 创建Kafka Topic3.4 启动Flume3.5 停止Flume 四、监控Kafkakafka to hdfs3.0 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.33.1 自定义拦截器3.2 编写配置文件3.3 启动Flume3.4 停止Flume 五、监控 ipportTODO 一、上传压缩包 官网https://flume.apache.org/ 二、解压压缩包
[mallmall software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/三、监控本地文件file to kafka Flume是用java写的所以需要确保JDK环境可用 需求描述监控目录下多个文件写入Kafka TAILDIR SOURCE本质是tail -F [file]命令只能监控文件的新增和修改不能处理历史文件。 3.1 编写配置文件
[mallmall ~]$ cd /opt/module/apache-flume-1.9.0-bin/
[mallmall apache-flume-1.9.0-bin]$ mkdir job
[mallmall apache-flume-1.9.0-bin]$ cd job/
[mallmall job]$ vim file_to_kafka.conf内容
# 0、配置agent给source channel sink组件命名
a1.sources r1
a1.channels c1# 1、配置source组件
a1.sources.r1.type TAILDIR
a1.sources.r1.filegroups f1
a1.sources.r1.filegroups.f1 /opt/module/applog/app.*
# 断点续传标记信息存储位置
a1.sources.r1.positionFile /opt/module/apache-flume-1.9.0-bin/taildir_position.json# 2、配置channel组件event临时缓冲区
a1.channels.c1.type org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic topic_mall_applog# 按照字符串类型传到kafka去
a1.channels.c1.parseAsFlumeEvent false# 3、配置source、channel、sink之间的连接关系
a1.sources.r1.channels c13.2 自定义拦截器 作用拦截events经拦截器处理输出处理后的events。 开发创建maven项目打成jar包形式上传到flume所在机器 3.2.1 开发拦截器jar包
1创建maven项目
2开发拦截器类
package com.songshuang.flume.interceptor;import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;/*** date 2023/11/21 20:40* 功能剔除掉非json格式数据** 1、实现接口* 2、实现抽象方法* 3、建造者模式静态内部类*/
public class ETLInterceptor implements Interceptor {public void initialize() {}// 将log中event为非json格式数据置为nullpublic Event intercept(Event event) {byte[] body event.getBody();// byte数组转为字符串String log new String(body, StandardCharsets.UTF_8);boolean flag false;// 判断log是否是json格式try {JSONObject jsonObject JSONObject.parseObject(log);flag true;} catch (JSONException e) {}return flag ? event : null;}// 将log中event为null的删掉public ListEvent intercept(ListEvent events) {// 遍历eventsIteratorEvent iterator events.iterator();while (iterator.hasNext()) {Event event iterator.next();if (intercept(event) null) {iterator.remove();}}return events;}public void close() {}// 建造者模式public static class Builder implements Interceptor.Builder {Overridepublic Interceptor build() {return new ETLInterceptor();}Overridepublic void configure(Context context) {}}
}3开发pom文件
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.songshuang/groupIdartifactIdflume_interceptor/artifactIdversion1.0-SNAPSHOT/versiondependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/versionscopeprovided/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.62/version/dependency/dependenciesbuildpluginspluginartifactIdmaven-compiler-plugin/artifactIdversion2.3.2/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/pluginpluginartifactIdmaven-assembly-plugin/artifactIdconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins/build
/project4打成jar包上传到Flume 上传到 /opt/module/apache-flume-1.9.0-bin/lib 目录下 3.2.3 修改配置文件
[mallmall job]$ vim file_to_kafka.conf新增内容
# 自定义拦截器
a1.sources.r1.interceptors i1
# 指定自定义拦截器的建造者类名入口
a1.sources.r1.interceptors.i1.type com.songshuang.flume.interceptor.ETLInterceptor$Builder3.3 创建Kafka Topic 为什么要手动创建topicflume自动创建的topic默认1个分区每个分区1个副本。手动创建可以指定分区和副本数可以有效利用Kafka集群资源。 –bootstrap-server参数作用连接Kafka集群 [hadoophadoop102 kafka_2.11-2.4.1]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 2 --partitions 3 --topic topic_mall_applog3.4 启动Flume 注意放开Kafka集群所在机器9092端口对Flume所在机器放开。 原因Flume需要向Kafka集群写入数据所以需要具有访问Kafka集群端口的权限。 – conf参数配置文件存储所在目录 – name参数agent名称每个Flume配置文件就是一个agent。 – conf-file参数flume本次启动读取的配置文件 nohup配合后台运行 /dev/null将标准输出重定向到 /dev/null 即丢弃所有输出 2/dev/null将标准错误输出重定向到 /dev/null 即丢弃所有错误输出 [mallmall ~]$ cd /opt/module/apache-flume-1.9.0-bin/
[mallmall apache-flume-1.9.0-bin]$ nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/file_to_kafka.conf /dev/null 2/dev/null 3.5 停止Flume
[mallmall apache-flume-1.9.0-bin]$ ps -ef | grep file_to_kafka.conf
[mallmall apache-flume-1.9.0-bin]$ kill 11001四、监控Kafkakafka to hdfs 需求描述监控Kafka将数据写入HDFS 如果想要从头消费需要设置kafka.consumer.auto.offset.reset earliest默认从最新offset开始 注意需要在HDFS所在机器部署FLume需要调用HADOOP相关jar包。 3.0 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3 否则Flume向HDFS写数据时会失败 [hadoophadoop104 ~]$ rm /opt/module/apache-flume-1.9.0-bin/lib/guava-11.0.2.jar3.1 自定义拦截器 作用按照kafka消息中的时间字段决定消息存储到hdfs的哪个文件中。 代码
package com.songshuang.flume.interceptor;import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;/*** date 2023/11/22 16:52* 作用获取kafka中时间戳字段放入event头中flume写入hdfs时从头部获取时间作为该event放入hdfs的文件夹名称*/
public class TimestampInterceptor implements Interceptor {Overridepublic void initialize() {}// 获取kafka时间戳字段放入event的headerOverridepublic Event intercept(Event event) {byte[] body event.getBody();String log new String(body, StandardCharsets.UTF_8);JSONObject jsonObject JSONObject.parseObject(log);String ts jsonObject.getString(ts);MapString, String headers event.getHeaders();headers.put(timestamp,ts); // event是引用变量类型存储的是地址header变了自然event所对应地址上的值就变了return event;}Overridepublic ListEvent intercept(ListEvent events) {for (Event event : events) {intercept(event);}return events;}Overridepublic void close() {}// 建造者模式public static class Builder implements Interceptor.Builder {Overridepublic Interceptor build() {return new TimestampInterceptor();}Overridepublic void configure(Context context) {}}
}3.2 编写配置文件
[hadoophadoop104 job]$ vim kafka_to_hdfs.conf内容 a1.sources.r1.kafka.consumer.group.id消费者组名。 a1.channels.c1.typefile类型channel缓冲数据放在磁盘中而不是内存中。 a1.channels.c1.dataDirsfile channel缓冲内容落盘地址。 a1.channels.c1.checkpointDir检查点存放位置用于断点续传。 # Name the components on this agent
a1.sources r1
a1.sinks k1
a1.channels c1# 配置source
a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics topic_mall_applog
a1.sources.r1.kafka.consumer.group.id consumer_group_flume
# 指定consumer从哪个offset开始消费默认latest
# a1.sources.r1.kafka.consumer.auto.offset.reset earliest
# 自定义拦截器
a1.sources.r1.interceptors i1
a1.sources.r1.interceptors.i1.type com.songshuang.flume.interceptor.TimestampInterceptor$Builder# 配置sink
a1.sinks.k1.type hdfs
a1.sinks.k1.hdfs.path /warehouse/applog/%Y-%m-%d
a1.sinks.k1.hdfs.codeC gzip# 配置channel
a1.channels.c1.type file
a1.channels.c1.dataDirs /opt/module/apache-flume-1.9.0-bin/data/kafka_to_hdfs
a1.channels.c1.checkpointDir /opt/module/apache-flume-1.9.0-bin/checkpoint/kafka_to_hdfs# Bind the source and sink to the channel
a1.sources.r1.channels c1
a1.sinks.k1.channel c13.3 启动Flume 注意1需要放开kafka端口即9092端口Flume要读Kafka。 [hadoophadoop104 job]$ cd /opt/module/apache-flume-1.9.0-bin/
[hadoophadoop104 apache-flume-1.9.0-bin]$ nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/kafka_to_hdfs.conf /dev/null 2/dev/null 3.4 停止Flume
[mallmall apache-flume-1.9.0-bin]$ ps -ef | grep kafka_to_hdfs.conf
[mallmall apache-flume-1.9.0-bin]$ kill 11001五、监控 ipportTODO