当前位置: 首页 > news >正文

池州市网站建设_网站建设公司_网站制作_seo优化

建站经验,中国制造网官方网站国际站,网址备案,网站加背景音乐文章目录 一、上传压缩包二、解压压缩包三、监控本地文件#xff08;file to kafka#xff09;3.1 编写配置文件3.2 自定义拦截器3.2.1 开发拦截器jar包#xff08;1#xff09;创建maven项目#xff08;2#xff09;开发拦截器类#xff08;3#xff09;开发pom文件file to kafka3.1 编写配置文件3.2 自定义拦截器3.2.1 开发拦截器jar包1创建maven项目2开发拦截器类3开发pom文件4打成jar包上传到Flume 3.2.3 修改配置文件 3.3 创建Kafka Topic3.4 启动Flume3.5 停止Flume 四、监控Kafkakafka to hdfs3.0 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.33.1 自定义拦截器3.2 编写配置文件3.3 启动Flume3.4 停止Flume 五、监控 ipportTODO 一、上传压缩包 官网https://flume.apache.org/ 二、解压压缩包 [mallmall software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/三、监控本地文件file to kafka Flume是用java写的所以需要确保JDK环境可用 需求描述监控目录下多个文件写入Kafka TAILDIR SOURCE本质是tail -F [file]命令只能监控文件的新增和修改不能处理历史文件。 3.1 编写配置文件 [mallmall ~]$ cd /opt/module/apache-flume-1.9.0-bin/ [mallmall apache-flume-1.9.0-bin]$ mkdir job [mallmall apache-flume-1.9.0-bin]$ cd job/ [mallmall job]$ vim file_to_kafka.conf内容 # 0、配置agent给source channel sink组件命名 a1.sources r1 a1.channels c1# 1、配置source组件 a1.sources.r1.type TAILDIR a1.sources.r1.filegroups f1 a1.sources.r1.filegroups.f1 /opt/module/applog/app.* # 断点续传标记信息存储位置 a1.sources.r1.positionFile /opt/module/apache-flume-1.9.0-bin/taildir_position.json# 2、配置channel组件event临时缓冲区 a1.channels.c1.type org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.channels.c1.kafka.topic topic_mall_applog# 按照字符串类型传到kafka去 a1.channels.c1.parseAsFlumeEvent false# 3、配置source、channel、sink之间的连接关系 a1.sources.r1.channels c13.2 自定义拦截器 作用拦截events经拦截器处理输出处理后的events。 开发创建maven项目打成jar包形式上传到flume所在机器 3.2.1 开发拦截器jar包 1创建maven项目 2开发拦截器类 package com.songshuang.flume.interceptor;import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List;/*** date 2023/11/21 20:40* 功能剔除掉非json格式数据** 1、实现接口* 2、实现抽象方法* 3、建造者模式静态内部类*/ public class ETLInterceptor implements Interceptor {public void initialize() {}// 将log中event为非json格式数据置为nullpublic Event intercept(Event event) {byte[] body event.getBody();// byte数组转为字符串String log new String(body, StandardCharsets.UTF_8);boolean flag false;// 判断log是否是json格式try {JSONObject jsonObject JSONObject.parseObject(log);flag true;} catch (JSONException e) {}return flag ? event : null;}// 将log中event为null的删掉public ListEvent intercept(ListEvent events) {// 遍历eventsIteratorEvent iterator events.iterator();while (iterator.hasNext()) {Event event iterator.next();if (intercept(event) null) {iterator.remove();}}return events;}public void close() {}// 建造者模式public static class Builder implements Interceptor.Builder {Overridepublic Interceptor build() {return new ETLInterceptor();}Overridepublic void configure(Context context) {}} }3开发pom文件 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.songshuang/groupIdartifactIdflume_interceptor/artifactIdversion1.0-SNAPSHOT/versiondependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/versionscopeprovided/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.62/version/dependency/dependenciesbuildpluginspluginartifactIdmaven-compiler-plugin/artifactIdversion2.3.2/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/pluginpluginartifactIdmaven-assembly-plugin/artifactIdconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins/build /project4打成jar包上传到Flume 上传到 /opt/module/apache-flume-1.9.0-bin/lib 目录下 3.2.3 修改配置文件 [mallmall job]$ vim file_to_kafka.conf新增内容 # 自定义拦截器 a1.sources.r1.interceptors i1 # 指定自定义拦截器的建造者类名入口 a1.sources.r1.interceptors.i1.type com.songshuang.flume.interceptor.ETLInterceptor$Builder3.3 创建Kafka Topic 为什么要手动创建topicflume自动创建的topic默认1个分区每个分区1个副本。手动创建可以指定分区和副本数可以有效利用Kafka集群资源。 –bootstrap-server参数作用连接Kafka集群 [hadoophadoop102 kafka_2.11-2.4.1]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 2 --partitions 3 --topic topic_mall_applog3.4 启动Flume 注意放开Kafka集群所在机器9092端口对Flume所在机器放开。 原因Flume需要向Kafka集群写入数据所以需要具有访问Kafka集群端口的权限。 – conf参数配置文件存储所在目录 – name参数agent名称每个Flume配置文件就是一个agent。 – conf-file参数flume本次启动读取的配置文件 nohup配合后台运行 /dev/null将标准输出重定向到 /dev/null 即丢弃所有输出 2/dev/null将标准错误输出重定向到 /dev/null 即丢弃所有错误输出 [mallmall ~]$ cd /opt/module/apache-flume-1.9.0-bin/ [mallmall apache-flume-1.9.0-bin]$ nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/file_to_kafka.conf /dev/null 2/dev/null 3.5 停止Flume [mallmall apache-flume-1.9.0-bin]$ ps -ef | grep file_to_kafka.conf [mallmall apache-flume-1.9.0-bin]$ kill 11001四、监控Kafkakafka to hdfs 需求描述监控Kafka将数据写入HDFS 如果想要从头消费需要设置kafka.consumer.auto.offset.reset earliest默认从最新offset开始 注意需要在HDFS所在机器部署FLume需要调用HADOOP相关jar包。 3.0 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3 否则Flume向HDFS写数据时会失败 [hadoophadoop104 ~]$ rm /opt/module/apache-flume-1.9.0-bin/lib/guava-11.0.2.jar3.1 自定义拦截器 作用按照kafka消息中的时间字段决定消息存储到hdfs的哪个文件中。 代码 package com.songshuang.flume.interceptor;import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map;/*** date 2023/11/22 16:52* 作用获取kafka中时间戳字段放入event头中flume写入hdfs时从头部获取时间作为该event放入hdfs的文件夹名称*/ public class TimestampInterceptor implements Interceptor {Overridepublic void initialize() {}// 获取kafka时间戳字段放入event的headerOverridepublic Event intercept(Event event) {byte[] body event.getBody();String log new String(body, StandardCharsets.UTF_8);JSONObject jsonObject JSONObject.parseObject(log);String ts jsonObject.getString(ts);MapString, String headers event.getHeaders();headers.put(timestamp,ts); // event是引用变量类型存储的是地址header变了自然event所对应地址上的值就变了return event;}Overridepublic ListEvent intercept(ListEvent events) {for (Event event : events) {intercept(event);}return events;}Overridepublic void close() {}// 建造者模式public static class Builder implements Interceptor.Builder {Overridepublic Interceptor build() {return new TimestampInterceptor();}Overridepublic void configure(Context context) {}} }3.2 编写配置文件 [hadoophadoop104 job]$ vim kafka_to_hdfs.conf内容 a1.sources.r1.kafka.consumer.group.id消费者组名。 a1.channels.c1.typefile类型channel缓冲数据放在磁盘中而不是内存中。 a1.channels.c1.dataDirsfile channel缓冲内容落盘地址。 a1.channels.c1.checkpointDir检查点存放位置用于断点续传。 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1# 配置source a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics topic_mall_applog a1.sources.r1.kafka.consumer.group.id consumer_group_flume # 指定consumer从哪个offset开始消费默认latest # a1.sources.r1.kafka.consumer.auto.offset.reset earliest # 自定义拦截器 a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.songshuang.flume.interceptor.TimestampInterceptor$Builder# 配置sink a1.sinks.k1.type hdfs a1.sinks.k1.hdfs.path /warehouse/applog/%Y-%m-%d a1.sinks.k1.hdfs.codeC gzip# 配置channel a1.channels.c1.type file a1.channels.c1.dataDirs /opt/module/apache-flume-1.9.0-bin/data/kafka_to_hdfs a1.channels.c1.checkpointDir /opt/module/apache-flume-1.9.0-bin/checkpoint/kafka_to_hdfs# Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c13.3 启动Flume 注意1需要放开kafka端口即9092端口Flume要读Kafka。 [hadoophadoop104 job]$ cd /opt/module/apache-flume-1.9.0-bin/ [hadoophadoop104 apache-flume-1.9.0-bin]$ nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/kafka_to_hdfs.conf /dev/null 2/dev/null 3.4 停止Flume [mallmall apache-flume-1.9.0-bin]$ ps -ef | grep kafka_to_hdfs.conf [mallmall apache-flume-1.9.0-bin]$ kill 11001五、监控 ipportTODO
http://www.lebaoying.cn/news/11999.html

相关文章:

  • 购物网站开发的需求分析asp与php做网站哪个好
  • 网站推广设计制作wordpress阿里云数据库
  • 2345网址导航怎么卸载seo是什么的缩写
  • 自己建站模板做网站公司苏州
  • 在自己网站建立自己的外链湖北省和建设厅网站
  • 专业高端网站建设服务公司新媒体运营师
  • 网站网址没有被百度收录电子商务网站建设学什么
  • 德州网站设计手机网站 微信
  • 那个网站的系统好1688网站可以自己做吗
  • 什么是网站推广网站开发设计工程师
  • 网站rss生成wordpress frp
  • 网站的稳定性wordpress插件修改
  • 网站流量与广告费大连网站建设兼职
  • 通州重庆网站建设网站与新闻建设总结
  • 百度云盘网站开发网站续费贵是重新做个好还是续费
  • 做网站编辑校对网站单页生成器
  • 南京网站c建设云世家什么网站教你做美食
  • 网站集约化建设调研报告怎么样备份网站数据
  • 好一点网站建设公司广州好的做网站公司
  • 值得买网站模板邯郸招聘信息最新招聘
  • 外贸产品推广网站正规app软件开发价格
  • 网站怎么做微信接口怎样申请一个免费网站
  • 外贸网站 域名后缀军事头条
  • 网站信息可以边建设边组织北大青鸟的网站建设课程多少钱
  • 塑胶原料东莞网站建设常州哪家做网站便宜
  • 网站托管服务适用于哪个网站买做房图纸好
  • 网站做支付宝和网银接口备案名称和网站名称
  • 具体的网站建设昆山建设工程交易网站
  • 台州网站建设网站中国建设银行官网站积分抽奖
  • 宁夏建设监督网站家居设计