集宁有做蒙古黑石材网站的嘛,网络营销网站源码,网站设置cookie什么意思,做网站能带来什么问题一、概论 1.1 什么是DataX DataX 是阿里巴巴开源的一个异构数据源离线同步工具#xff0c;致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。 1.2 DataX 的设计 为了解决异构数据源同步问题#xf… 一、概论 1.1 什么是DataX DataX 是阿里巴巴开源的一个异构数据源离线同步工具致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。 1.2 DataX 的设计 为了解决异构数据源同步问题DataX 将复杂的网状的同步链路变成了星型数据链路DataX 作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候只需要将此数据源对接到 DataX便能跟已有的数据源做到无缝数据同步。 1.3 框架设计 Reader数据采集模块负责采集数据源的数据将数据发给Framework。Wiriter: 数据写入模块负责不断向Framwork取数据并将数据写入到目的端。Framework:用于连接read和writer,作为两者的数据传输通道并处理缓冲流控并发数据转换等核心技术问题。 运行原理Job单个作业的管理节点负责数据清理、子任务划分、TaskGroup监控管理。Task由Job切分而来是DataX作业的最小单元每个Task负责一部分数据的同步工作。Schedule将Task组成TaskGroup单个TaskGroup的并发数量为5。TaskGroup负责启动Task。 1.4 Datax所支持的渠道 类型数据源读者作家写文件RDBMS关系型数据库MySQL√√读写 甲骨文 √ √ 读写SQL服务器√√读写PostgreSQL的√√读写DRDS√√读写通用RDBMS支持所有关系型数据库√√读写阿里云数仓数据存储ODPS√√读写美国存托凭证√写开源软件√√读写OCS√√读写NoSQL数据存储OTS√√读写Hbase0.94√√读写Hbase1.1√√读写凤凰4.x√√读写凤凰5.x√√读写MongoDB√√读写蜂巢√√读写卡桑德拉√√读写无结构化数据存储文本文件√√读写的FTP√√读写HDFS√√读写弹性搜索√写时间序列数据库OpenTSDB√读技术开发局√√读写 二、快速入门 2.1 环境搭建 下载地址 http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz 源码地址 https://github.com/alibaba/DataX 配置要求 LinuxJDK(1.8以上 建议1.8) 下载Python(推荐 Python2.6.X)下载安装 1 将下载好的datax.tar.gz上传到服务器的任意节点,我这里上传到node01上的/exprot/soft 2解压到/export/servers/ [rootnode01 soft]# tar -zxvf datax.tar.gz -C ../servers/ 3运行自检脚本 出现以下结果说明你得环境没有问题 [/opt/module/datax/plugin/reader/._hbase094xreader/plugin.json]不存在. 请检查您的配置文件. 2.2搭建环境注意事项 [/opt/module/datax/plugin/reader/._hbase094xreader/plugin.json]不存在. 请检查您的配置文件. 参考 find ./* -type f -name .*er | xargs rm -rf
find: paths must precede expression:
Usage: find [-H] [-L] [-P] [-Olevel] [-D help|tree|search|stat|rates|opt|exec] [path...] [expression]find /datax/plugin/reader/ -type f -name ._*er | xargs rm -rf
find /datax/plugin/writer/ -type f -name ._*er | xargs rm -rf这里的/datax/plugin/writer/要改为你自己的目录原文链接https://blog.csdn.net/dz77dz/article/details/127055299 2.3读取Mysql中的数据写入到HDFS 准备 创建数据库和表并加载测试数据 create database test;
use test;
create table c_s(id varchar(100) null,c_id int null,s_id varchar(20) null
);
INSERT INTO test.c_s (id, c_id, s_id) VALUES (123, 1, 201967);
INSERT INTO test.c_s (id, c_id, s_id) VALUES (123, 2, 201967);
INSERT INTO test.c_s (id, c_id, s_id) VALUES (123, 3, 201967);
INSERT INTO test.c_s (id, c_id, s_id) VALUES (123, 5, 201967);
INSERT INTO test.c_s (id, c_id, s_id) VALUES (123, 6, 201967); 查看官方提供的模板 [rootnode01 datax]# bin/datax.py -r mysqlreader -w hdfswriterDataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.Please refer to the mysqlreader document:https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.mdPlease refer to the hdfswriter document:https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.mdPlease save the following configuration as a json file and usepython {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.{job: {content: [{reader: {name: mysqlreader,parameter: {column: [],connection: [{jdbcUrl: [],table: []}],password: ,username: ,where: }},writer: {name: hdfswriter,parameter: {column: [],compress: ,defaultFS: ,fieldDelimiter: ,fileName: ,fileType: ,path: ,writeMode: }}}],setting: {speed: {channel: }}}
} 根据官网模板进行修改 [rootnode01 datax]# vim job/mysqlToHDFS.json
{job: {content: [{reader: {name: mysqlreader,parameter: {column: [id,c_id,s_id],connection: [{jdbcUrl: [jdbc:mysql://node02:3306/test],table: [c_s]}],password: 123456,username: root}},writer: {name: hdfswriter,parameter: {column: [{name: id,type: string},{name: c_id,type: int},{name: s_id,type: string}],defaultFS: hdfs://node01:8020,fieldDelimiter: \t,fileName: c_s.txt,fileType: text,path: /,writeMode: append}}}],setting: {speed: {channel: 1}}}
}HDFS的端口号注意版本2.7.4 是9000hdfs://node01:9000 MySQL的参数介绍 HDFS参数介绍 运行脚本 [rootnode01 datax]# bin/datax.py job/mysqlToHDFS.json
2020-10-02 16:12:16.358 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /export/servers/datax/hook
2020-10-02 16:12:16.359 [job-0] INFO JobContainer -[total cpu info] averageCpu | maxDeltaCpu | minDeltaCpu-1.00% | -1.00% | -1.00%[total gc info] NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTimePS MarkSweep | 1 | 1 | 1 | 0.245s | 0.245s | 0.245sPS Scavenge | 1 | 1 | 1 | 0.155s | 0.155s | 0.155s2020-10-02 16:12:16.359 [job-0] INFO JobContainer - PerfTrace not enable!
2020-10-02 16:12:16.359 [job-0] INFO StandAloneJobContainerCommunicator - Total 5 records, 50 bytes | Speed 5B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2020-10-02 16:12:16.360 [job-0] INFO JobContainer -
任务启动时刻 : 2020-10-02 16:12:04
任务结束时刻 : 2020-10-02 16:12:16
任务总计耗时 : 12s
任务平均流量 : 5B/s
记录写入速度 : 0rec/s
读出记录总数 : 5
读写失败总数 : 0 2.4 读取HDFS中的数据写入到Mysql 准备工作 create database test;
use test;
create table c_s2(id varchar(100) null,c_id int null,s_id varchar(20) null
);查看官方提供的模板 [rootnode01 datax]# bin/datax.py -r hdfsreader -w mysqlwriterDataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.Please refer to the hdfsreader document:https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.mdPlease refer to the mysqlwriter document:https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.mdPlease save the following configuration as a json file and usepython {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.{job: {content: [{reader: {name: hdfsreader,parameter: {column: [],defaultFS: ,encoding: UTF-8,fieldDelimiter: ,,fileType: orc,path: }},writer: {name: mysqlwriter,parameter: {column: [],connection: [{jdbcUrl: ,table: []}],password: ,preSql: [],session: [],username: ,writeMode: }}}],setting: {speed: {channel: }}}
} 根据官方提供模板进行修改 [rootnode01 datax]# vim job/hdfsTomysql.json
{job: {content: [{reader: {name: hdfsreader,parameter: {column: [*],defaultFS: hdfs://node01:8020,encoding: UTF-8,fieldDelimiter: \t,fileType: text,path: /c_s.txt}},writer: {name: mysqlwriter,parameter: {column: [id,c_id,s_id],connection: [{jdbcUrl: jdbc:mysql://node02:3306/test,table: [c_s2]}],password: 123456,username: root,writeMode: replace}}}],setting: {speed: {channel: 1}}}
}脚本运行 [rootnode01 datax]# bin/datax.py job/hdfsTomysql.json[total cpu info] averageCpu | maxDeltaCpu | minDeltaCpu-1.00% | -1.00% | -1.00%[total gc info] NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTimePS MarkSweep | 1 | 1 | 1 | 0.026s | 0.026s | 0.026sPS Scavenge | 1 | 1 | 1 | 0.015s | 0.015s | 0.015s2020-10-02 16:57:13.152 [job-0] INFO JobContainer - PerfTrace not enable!
2020-10-02 16:57:13.152 [job-0] INFO StandAloneJobContainerCommunicator - Total 5 records, 50 bytes | Speed 5B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.033s | Percentage 100.00%
2020-10-02 16:57:13.153 [job-0] INFO JobContainer -
任务启动时刻 : 2020-10-02 16:57:02
任务结束时刻 : 2020-10-02 16:57:13
任务总计耗时 : 11s
任务平均流量 : 5B/s
记录写入速度 : 0rec/s
读出记录总数 : 5
读写失败总数 : 02.5将Mysql表导入Hive 1.在hive中建表 -- hive建表
CREATE TABLE student2 (classNo string,stuNo string,score int)
row format delimited fields terminated by ,;-- 构造点mysql数据
create table if not exists student2(classNo varchar ( 50 ),stuNo varchar ( 50 ),score int
)
insert into student2 values(1001,1012ww10087,63);
insert into student2 values(1002,1012aa10087,63);
insert into student2 values(1003,1012bb10087,63);
insert into student2 values(1004,1012cc10087,63);
insert into student2 values(1005,1012dd10087,63);
insert into student2 values(1006,1012ee10087,63); 2.编写mysql2hive.json配置文件 {job: {setting: {speed: {channel: 1}},content: [{reader: {name: mysqlreader,parameter: {username: root,password: root,connection: [{table: [student2],jdbcUrl: [jdbc:mysql://192.168.43.10:3306/mytestmysql]}],column: [classNo,stuNo,score]}},writer: {name: hdfswriter,parameter: {defaultFS: hdfs://192.168.43.10:9000,path: /hive/warehouse/home/myhive.db/student2,fileName: myhive,writeMode: append,fieldDelimiter: ,,fileType: text,column: [{name: classNo,type: string},{name: stuNo,type: string},{name: score,type: int}]}}}]}
} 3.运行脚本 bin/datax.py job/mysql2hive.json 4.查看hive表是否有数据 2.6将Hive表数据导入Mysql 1.要先在mysql建好表 create table if not exists student(classNo varchar ( 50 ),stuNo varchar ( 50 ),score int
) 2.hive2mysql.json配置文件 {job: {setting: {speed: {channel: 3}},content: [{reader: {name: hdfsreader,parameter: {path: /hive/warehouse/home/myhive.db/student/*,defaultFS: hdfs://192.168.43.10:9000,column: [{index: 0,type: string},{index: 1,type: string},{index: 2,type: Long}],fileType: text,encoding: UTF-8,fieldDelimiter: ,}},writer: {name: mysqlwriter,parameter: {writeMode: insert,username: root,password: root,column: [classNo,stuNo,score],preSql: [delete from student],connection: [{jdbcUrl: jdbc:mysql://192.168.43.10:3306/mytestmysql?useUnicodetruecharacterEncodingutf8,table: [student]}]}}}]}
}注意事项 在Hive的ODS层建表语句中以“,”为分隔符
fields terminated by ,
在DataX的json文件中也以“,”为分隔符。
fieldDelimiter: , 与hive表里面的分隔符保持一致即可由于DataX不能完全支持所有Hive表的数据类型应将DataX启动文件中的hdfsreader中的column字段的类型改成DataX支持的类型