使用DataX和sqoop将数据从MySQL导入Hive

使用DataX和sqoop将数据从MySQL导入Hive

一、DataX简述

DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。
源码地址:https://github.com/alibaba/DataX

二、sqoop简述

Apache Sqoop(SQL-to-Hadoop)项目旨在协助RDBMS与Hadoop之间进行高效的大数据交流。用户可以在 Sqoop 的帮助下,轻松地把关系型数据库的数据导入到 Hadoop 与其相关的系统 (如HBase和Hive)中;同时也可以把数据从 Hadoop 系统里抽取并导出到关系型数据库里。
Sqoop是一个在结构化数据和Hadoop之间进行批量数据迁移的工具,结构化数据可以是MySQL、Oracle等RDBMS。Sqoop底层用MapReduce程序实现抽取、转换、加载,MapReduce天生的特性保证了并行化和高容错率,而且相比Kettle等传统ETL工具,任务跑在Hadoop集群上,减少了ETL服务器资源的使用情况。在特定场景下,抽取过程会有很大的性能提升。

三、需求背景

MySQL中的库ecommerce有以下表,需要将以下表导入Hive中的ecommerce库中作为ODS层以后进一步处理。

  • 用户信息表(t_member)
  • 用户信息表(t_member)
  • 用户地址表(t_member_addr)
  • 商品信息表(t_commodity)
  • 商品类别信息表(t_commodity_cate)
  • 订单表(t_order)
  • 订单商品表(t_order_commodity)
  • 优惠券表(t_coupon)
  • 用户优惠券表(t_coupon_member)
  • 订单优惠券表(t_coupon_order)
  • 快递表(t_delivery)
  • 反馈表(t_feedback)
  • 商店表(t_shop)
  • 商家订单表(t_shop_order,订单对于卖家也有记录)
  • 后台用户表(t_user,员工表)

四、实现方式

可以通过sqoop和DataX两种方式来实现数据从MySQL导入Hive功能。

3.1 使用DataX将数据从MySQL导入Hive

在Hive里建库 ecommerce
create database ecommerce;
使用Python写入DataX配置文件批量生成脚本gen_import_config.py


import json
import getopt
import os
import sys
import MySQLdb

mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "123456"

hdfs_nn_host = "hadoop102"
hdfs_nn_port = "8020"

output_path = "/opt/module/datax/job/import"

def get_connection():
    return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)

def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall

def get_mysql_columns(database, table):
    return map(lambda x: x[0], get_mysql_meta(database, table))

def get_hive_columns(database, table):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string",
            "tinytext": "string"
        }
        return mappings[mysql_type]

    meta = get_mysql_meta(database, table)
    return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)

def generate_json(source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(source_database, source_table),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "fileType": "text",
                        "path": "${targetdir}",
                        "fileName": source_table,
                        "column": get_hive_columns(source_database, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)

def main(args):
    source_database = ""
    source_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value

    generate_json(source_database, source_table)

if __name__ == '__main__':
    main(sys.argv[1:])

由于需要使用Python访问Mysql数据库,故需安装驱动,命令如下:

yum install -y MySQL-python

调用上述的配置文件脚本


python /root/bin/gen_import_config.py -d ecommerce -t t_commodity
python /root/bin/gen_import_config.py -d ecommerce -t t_commodity_cate
python /root/bin/gen_import_config.py -d ecommerce -t t_coupon
python /root/bin/gen_import_config.py -d ecommerce -t t_coupon_member
python /root/bin/gen_import_config.py -d ecommerce -t t_coupon_order
python /root/bin/gen_import_config.py -d ecommerce -t t_delivery
python /root/bin/gen_import_config.py -d ecommerce -t t_feedback
python /root/bin/gen_import_config.py -d ecommerce -t t_member
python /root/bin/gen_import_config.py -d ecommerce -t t_member_addr
python /root/bin/gen_import_config.py -d ecommerce -t t_order
python /root/bin/gen_import_config.py -d ecommerce -t t_order_commodity
python /root/bin/gen_import_config.py -d ecommerce -t t_shop
python /root/bin/gen_import_config.py -d ecommerce -t t_shop_order
python /root/bin/gen_import_config.py -d ecommerce -t t_user

在/opt/module/datax/job/import可以看到json文件

使用DataX和sqoop将数据从MySQL导入Hive
编写脚本将mysql数据传入HDFS中

DATAX_HOME=/opt/module/datax

handle_targetdir() {
  hadoop fs -test -e $1
  if [[ $? -eq 1 ]]; then
    echo "路径$1不存在,正在创建......"
    hadoop fs -mkdir -p $1
  else
    echo "路径$1已经存在"
    fs_count=$(hadoop fs -count $1)
    content_size=$(echo $fs_count | awk '{print $3}')
    if [[ $content_size -eq 0 ]]; then
      echo "路径$1为空"
    else
      echo "路径$1不为空,正在清空......"
      hadoop fs -rm -r -f $1/*
    fi
  fi
}

import_data() {
  datax_config=$1
  target_dir=$2

  handle_targetdir $target_dir
  python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config
}

case $1 in
"t_commodity")
  import_data /opt/module/datax/job/import/ecommerce.t_commodity.json /ecommerce/t_commodity
  ;;
"t_commodity_cate")
  import_data /opt/module/datax/job/import/ecommerce.t_commodity_cate.json /ecommerce/t_commodity_cate
  ;;
"t_coupon")
  import_data /opt/module/datax/job/import/ecommerce.t_coupon.json /ecommerce/t_coupon
  ;;
"t_coupon_member")
  import_data /opt/module/datax/job/import/ecommerce.t_coupon_member.json /ecommerce/t_coupon_member
  ;;
"t_coupon_order")
  import_data /opt/module/datax/job/import/ecommerce.t_coupon_order.json /ecommerce/t_coupon_order
  ;;
"t_delivery")
  import_data /opt/module/datax/job/import/ecommerce.t_delivery.json /ecommerce/t_delivery
  ;;
"t_feack")
  import_data /opt/module/datax/job/import/ecommerce.t_feack.json /ecommerce/t_feack
  ;;
"t_member")
  import_data /opt/module/datax/job/import/ecommerce.t_member.json /ecommerce/t_member
  ;;
"t_member_addr")
  import_data /opt/module/datax/job/import/ecommerce.t_member_addr.json /ecommerce/t_member_addr
  ;;
"t_order")
  import_data /opt/module/datax/job/import/ecommerce.t_order.json /ecommerce/t_order
  ;;
"t_order_commodity")
  import_data /opt/module/datax/job/import/ecommerce.t_order_commodity.json /ecommerce/t_order_commodity
  ;;
"t_shop")
  import_data /opt/module/datax/job/import/ecommerce.t_shop.json /ecommerce/t_shop
  ;;
"t_shop_order")
  import_data /opt/module/datax/job/import/ecommerce.t_shop_order.json /ecommerce/t_shop_order
  ;;
"t_user")
  import_data /opt/module/datax/job/import/ecommerce.t_user.json /ecommerce/t_user
  ;;
"all")
  import_data /opt/module/datax/job/import/ecommerce.t_commodity.json /ecommerce/t_commodity
  import_data /opt/module/datax/job/import/ecommerce.t_commodity_cate.json /ecommerce/t_commodity_cate
  import_data /opt/module/datax/job/import/ecommerce.t_coupon.json /ecommerce/t_coupon
  import_data /opt/module/datax/job/import/ecommerce.t_coupon_member.json /ecommerce/t_coupon_member
  import_data /opt/module/datax/job/import/ecommerce.t_coupon_order.json /ecommerce/t_coupon_order
  import_data /opt/module/datax/job/import/ecommerce.t_delivery.json /ecommerce/t_delivery
  import_data /opt/module/datax/job/import/ecommerce.t_feedback.json /ecommerce/t_feedback
  import_data /opt/module/datax/job/import/ecommerce.t_member.json /ecommerce/t_member
  import_data /opt/module/datax/job/import/ecommerce.t_member_addr.json /ecommerce/t_member_addr
  import_data /opt/module/datax/job/import/ecommerce.t_order.json /ecommerce/t_order
  import_data /opt/module/datax/job/import/ecommerce.t_order_commodity.json /ecommerce/t_order_commodity
  import_data /opt/module/datax/job/import/ecommerce.t_shop.json /ecommerce/t_shop
  import_data /opt/module/datax/job/import/ecommerce.t_shop_order.json /ecommerce/t_shop_order
  import_data /opt/module/datax/job/import/ecommerce.t_user.json /ecommerce/t_user
  ;;
esac

调用上述函数可以在HDFS上看到数据库文件

使用DataX和sqoop将数据从MySQL导入Hive
启动hiveserver2在Hive中建表,与MySQL字段映射关系如下
使用DataX和sqoop将数据从MySQL导入Hive
写入SQL建表及加载HDFS上的数据
DROP TABLE IF EXISTS t_commodity;
CREATE TABLE t_commodity (
  id BIGINT,
  commodity_name STRING,
  commodity_price DECIMAL(10,2),
  commodity_cate_one BIGINT,
  commodity_cate_two BIGINT,
  create_user_id BIGINT,
  status BIGINT,
  create_time STRING,
  update_time STRING
)COMMENT '商品信息表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/ecommerce/t_commodity/';
load data inpath 'hdfs:/ecommerce/t_commodity' into table t_commodity;

DROP TABLE IF EXISTS t_commodity_cate;
CREATE TABLE t_commodity_cate (
  id BIGINT,
  cate_name STRING,
  cate_parent_id BIGINT,
  create_user_id BIGINT,
  status BIGINT,
  create_time STRING,
  update_time STRING
)COMMENT '商品类别信息表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/ecommerce/t_commodity_cate/';
load data inpath 'hdfs:/ecommerce/t_commodity_cate' into table t_commodity_cate;

DROP TABLE IF EXISTS t_coupon;
CREATE TABLE t_coupon (
  id BIGINT,
  coupon_name STRING,
  coupon_price decimal(10,2),
  create_user_id BIGINT,
  create_time STRING,
  update_time STRING
)COMMENT '优惠券表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/ecommerce/t_coupon/';
load data inpath 'hdfs:/ecommerce/t_coupon' into table t_coupon;

DROP TABLE IF EXISTS t_coupon_member;
CREATE TABLE t_coupon_member (
  id BIGINT,
  coupon_id BIGINT,
  member_id BIGINT,
  coupon_channel BIGINT COMMENT '1 用户购买 2 公司发放',
  create_time STRING,
  update_time STRING
)COMMENT '用户优惠券表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/ecommerce/t_coupon_member/';
load data inpath 'hdfs:/ecommerce/t_coupon_member' into table t_coupon_member;

DROP TABLE IF EXISTS t_member;
CREATE TABLE t_member (
  id BIGINT,
  name STRING,
  password STRING,
  sex STRING,
  phone STRING,
  address_default_id BIGINT,
  member_channel BIGINT COMMENT '1 IOS 2 android 3 微信小程序 4 微信公众号 5 h5',
  mp_open_id STRING COMMENT '微信公众号openId',
  status BIGINT,
  create_time STRING,
  update_time STRING
)COMMENT '用户信息表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/ecommerce/t_member/';
load data inpath 'hdfs:/ecommerce/t_member' into table t_member;

DROP TABLE IF EXISTS t_member_addr;
CREATE TABLE t_member_addr (
  id BIGINT,
  member_id BIGINT,
  contact_person STRING,
  contact_phone STRING,
  address STRING,
  create_time STRING,
  update_time STRING
)COMMENT '用户地址表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/ecommerce/t_member_addr/';
load data inpath 'hdfs:/ecommerce/t_member_addr' into table t_member_addr;

DROP TABLE IF EXISTS t_delivery;
CREATE TABLE t_delivery (
  id BIGINT,
  delivery_no STRING,
  order_id BIGINT,
  shop_id BIGINT,
  postman BIGINT,
  pick_time STRING,
  arrive_time STRING,
  member_id BIGINT,
  member_addr_id BIGINT
)COMMENT '快递表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/ecommerce/t_delivery/';
load data inpath 'hdfs:/ecommerce/t_delivery' into table t_delivery;

DROP TABLE IF EXISTS t_feedback;
CREATE TABLE t_feedback (
  id BIGINT,
  member_id BIGINT,
  create_user_id BIGINT,
  feedback_content STRING,
  feedback_type BIGINT COMMENT '1 破损 2 缺货 3 错货 4 投诉'
)COMMENT '反馈表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/ecommerce/t_feedback/';
load data inpath 'hdfs:/ecommerce/t_feedback' into table t_feedback;

DROP TABLE IF EXISTS t_shop;
CREATE TABLE t_shop (
  id BIGINT,
  shop_name STRING,
  city_id BIGINT,
  city_name STRING,
  area_id BIGINT,
  area_name STRING,
  charge_user STRING,
  create_time STRING,
  update_time STRING
)COMMENT '商店表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/ecommerce/t_shop/';
load data inpath 'hdfs:/ecommerce/t_shop' into table t_shop;

DROP TABLE IF EXISTS t_shop_order;
CREATE TABLE t_shop_order (
  id BIGINT,
  shop_id BIGINT,
  order_id BIGINT,
  start_time STRING,
  done_time STRING
)COMMENT '商家订单表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/ecommerce/t_shop_order/';
load data inpath 'hdfs:/ecommerce/t_shop_order' into table t_shop_order;

DROP TABLE IF EXISTS t_user;
CREATE TABLE t_user (
  id BIGINT,
  user_name STRING,
  user_password STRING,
  user_phone STRING,
  create_time STRING,
  update_time STRING
)COMMENT '后台用户表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/ecommerce/t_user/';
load data inpath 'hdfs:/ecommerce/t_user' into table t_user;

DROP TABLE IF EXISTS t_order;
CREATE TABLE t_order (
  order_id BIGINT,
  member_id BIGINT,
  origin_price decimal(10,2),
  pay_price decimal(10,2),
  shop_id BIGINT,
  shop_name STRING,
  order_status STRING COMMENT '1,进行中 2 已完成 3 已取消',
  create_time STRING,
  update_time STRING
)COMMENT '订单表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/ecommerce/t_order/';
load data inpath 'hdfs:/ecommerce/t_order' into table t_order;

DROP TABLE IF EXISTS t_order_commodity;
CREATE TABLE t_order_commodity (
  id BIGINT,
  order_id BIGINT,
  commodity_id BIGINT,
  commodity_name STRING,
  commodity_num BIGINT,
  commodity_price decimal(10,2),
  create_time STRING,
  update_time STRING
)COMMENT '订单商品表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/ecommerce/t_order_commodity/';
load data inpath 'hdfs:/ecommerce/t_order_commodity' into table t_order_commodity;

在Hive中可以看到如下表格

使用DataX和sqoop将数据从MySQL导入Hive
数据也都进入了Hive表中
使用DataX和sqoop将数据从MySQL导入Hive

3.2 通过sqoop将数据从MySQL导入Hive

在Hive里建库 ecommerce
create database ecommerce;
运行下面的函数可以将数据从MySQL导入Hive


sq()
{
./sqoop import \
--connect jdbc:mysql://hadoop102:3306/ecommerce \
--username root \
--password 000000 \
--table $1 \
--num-mappers 1 \
--hive-import \
--fields-terminated-by "\t" \
--hive-overwrite \
--hive-database ecommerce \
--hive-table $1
}
sq t_commodity
sq t_commodity_cate
sq t_coupon
sq t_coupon_member
sq t_coupon_order
sq t_delivery
sq t_feedback
sq t_member
sq t_member_addr
sq t_order
sq t_order_commodity
sq t_shop
sq t_shop_order
sq t_user

四、总结

4.1 Datax主要特点

1、异构数据库 和 文件系统 之间的数据交换;
2、采用 Framework + plugin 架构构建,
Framework 处理了缓冲,流控,并发,上下文加载
等高速数据交换的大部分技术问题,提供了简单的接口与插件交互,
插件仅需实现对数据处理系统的访问;
3、数据传输过程在单进程内完成,全内存操作,不读写磁盘,也没有 IPC;
4、开放式的框架,开发者可以在极短的时间开发一个新插件
以快速支持新的数据库/文件系统。

4.2 Sqoop主要特点

1、可以将关系型数据库中的数据导入 hdfs、hive 或者 hbase 等 hadoop 组件中,
也可将 hadoop 组件中的数据导入到关系型数据库中;
2、sqoop 在导入导出数据时,充分采用了 map-reduce 计算框架,
根据输入条件生成一个 map-reduce 作业,在 hadoop 集群中运行。
采用 map-reduce 框架同时在多个节点进行 import 或者 export 操作,
速度比单节点运行多个并行导入导出效率高,同时提供了良好的并发性和容错性;
3、支持 insert、update 模式,可以选择参数,若内容存在就更新,若不存在就插入;
4、对国外的主流关系型数据库支持性更好。

4.3 Sqoop 和 Datax的区别

1、sqoop 采用 map-reduce 计算框架进行导入导出,而 datax 仅仅在运行 datax 的单台机器上进行数据的抽取和加载,速度比 sqoop 慢了许多;
2、sqoop 只可以在关系型数据库和 hadoop 组件之间进行数据迁移,
而在 hadoop 相关组件之间,比如 hive 和 hbase 之间就无法使用 sqoop 互相导入导出数据,同时在关系型数据库之间,比如 mysql 和 oracle 之间也无法通过 sqoop 导入导出数据。
datax 能够分别实现关系型数据库 hadoop 组件之间、关系型数据库之间、hadoop 组件之间的数据迁移;
3、sqoop 是专门为 hadoop 而生,对 hadoop 支持度好,而 datax 可能会出现不支持高版本 hadoop 的现象;
4、sqoop 只支持官方提供的指定几种关系型数据库和 hadoop 组件之间的数据交换,而在 datax 中,用户只需根据自身需求修改文件,生成相应 rpm 包,自行安装之后就可以使用自己定制的插件;

Original: https://blog.csdn.net/Davidchou3165/article/details/127472181
Author: Davidchou3165
Title: 使用DataX和sqoop将数据从MySQL导入Hive

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/817881/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球