Doris大数据分析保姆级使用教程

目录

Doris安装

集群部署

官网下载地址 https://doris.apache.org/zh-CN/downloads/downloads.html,选择二进制下载,源码下载需要自己编译

Doris大数据分析保姆级使用教程

解压doris文件

tar -zxvf apache-doris-1.0.0-incubating-bin.tar.gz -C /opt/module/

集群规划

hadoop102hadoop103hadoop104FE-LeaderFE-FlowerFE-Flower/ObserverBEBEBEBROKERBROKERBROKER

FE部署

修改配置文件 vim conf/fe.conf

meta_dir = /opt/module/doris-meta

Doris大数据分析保姆级使用教程

集群中分发存储路径和FE配置文件,启动FE

创建meta文件夹存储路径
mkdir /opt/module/doris-meta
三台机器都要执行
sh bin/start_fe.sh --daemon

Doris大数据分析保姆级使用教程

BE部署

修改配置文件 vim conf/be.conf

storage_root_path配置存储目录,可以用;来指定多个目录,每个目录后可以跟逗号,指定大小默认GB
storage_root_path = /opt/module/doris_storage1,10;/opt/module/doris_storage2

Doris大数据分析保姆级使用教程

集群中分发存储路径和BE配置文件,启动BE

创建storage_root_path存储路径
mkdir /opt/module/doris_storage1
mkdir /opt/module/doris_storage2
三台机器都要执行
sh bin/start_be.sh --daemon

Doris大数据分析保姆级使用教程

访问doris pe节点

doris可以使用mysql客户端访问,如果未安装,则需要安装mysql-client

第一次访问不需要密码,可以自行设置密码
mysql -hdoris1 -P 9030 -uroot
修改密码
set password for 'root' = password('root');

Doris大数据分析保姆级使用教程

添加BE节点

通过mysql客户端登入后,添加be节点,port为be上的heartbeat_service_port端口,默认9050

mysql> ALTER SYSTEM ADD BACKEND "hadoop102:9050";
mysql> ALTER SYSTEM ADD BACKEND "hadoop103:9050";
mysql> ALTER SYSTEM ADD BACKEND "hadoop104:9050";

通过mysql客户端,检测be节点状态,alive必须为true

mysql> SHOW PROC '/backends';

Doris大数据分析保姆级使用教程

BROKER部署

可选,非必须部署,启动BROKER

三台集群都要启动
sh bin/start_broker.sh --daemon

Doris大数据分析保姆级使用教程

使用mysql客户端访问pe,添加broker节点

mysql> ALTER SYSTEM ADD BROKER broker_name "hadoop102:8000","hadoop103:8000","hadoop104:8000";

查看broker状态

mysql> SHOW PROC "/brokers";

Doris大数据分析保姆级使用教程

扩容缩容

Doris可以很方便的扩容和缩容FE、BE、Broker实例。通过页面访问进行监控,访问8030,账户为root,密码默认为空不用填写,除非上述设置了密码使用密码登录 http://hadoop102:8030

Doris大数据分析保姆级使用教程

; FE 扩容和缩容

FE 节点的扩容和缩容过程,不影响当前系统运行

使用mysql登录客户端后,可以使用sql命令查看FE状态,目前就一台FE

mysql> SHOW PROC '/frontends';

Doris大数据分析保姆级使用教程

Doris大数据分析保姆级使用教程

增加FE节点,FE分为Leader,Follower和Observer三种角色。默认一个集群只能有一个Leader,可以有多个Follower和Observer.其中Leader和Follower组成一个Paxos选择组,如果Leader宕机,则剩下的Follower会成为Leader,保证HA。Observer是负责同步Leader数据的不参与选举。如果只部署一个FE,则FE默认就是Leader

第一个启动的FE自动成为Leader。在此基础上,可以添加若干Follower和Observer。添加Follower或Observer。使用mysql-client连接到已启动的FE,并执行:

在doris2部署Follower,doris3上部署Observer

执行其中的一个即可,注解如下
follower/observer_host IP节点位置
edit_log_port fe.conf配置文件中可以查询到

ALTER SYSTEM ADD FOLLOWER "follower_host:edit_log_port";
ALTER SYSTEM ADD FOLLOWER "hadoop103:9010";
ALTER SYSTEM ADD OBSERVER "observer_host:edit_log_port";
ALTER SYSTEM ADD OBSERVER "hadoop104:9010";

Doris大数据分析保姆级使用教程

需要重启配置节点的FE,并添加如下参数启动

--helper参数指定leader地址和端口号
sh bin/start_fe.sh --helper hadoop102:9010 --daemon
sh bin/start_fe.sh --helper hadoop102:9010 --daemon

全部启动完毕后,再通过mysql客户端,查看FE状况

mysql> SHOW PROC '/frontends';

Doris大数据分析保姆级使用教程

使用以下命令删除对应的FE节点 ALTER SYSTEM DROP FOLLOWER[OBSERVER] "fe_host:edit_log_port"; 删除Follower FE时,确保最终剩余的Follower(包括 Leader)节点为奇数

ALTER SYSTEM DROP FOLLOWER "hadoop103:9010";
ALTER SYSTEM DROP OBSERVER "hadoop104:9010";

BE 扩容和缩容

增加BE节点,就像上面安装一样在mysql客户端,使用 ALTER SYSTEM ADD BACKEND

删除BE节点,使用 ALTER SYSTEM DROP BACKEND "be_host:be_heartbeat_service_port";

具体文档请查看官网

Doris操作手册

创建用户


mysql -hhadoop102 -P 9030 -uroot

mysql> create user 'test' identified by 'test';

mysql> exit;
mysql -hhadoop102 -P 9030 -utest -ptest

表操作


mysql> create database test_db;

mysql> grant all  on test_dn to test;

mysql> use test_db;

分区表

分区表分为单分区和复合分区

单分区表,建立一张student表。分桶列为id,桶数为10,副本数为1

CREATE TABLE student
(
id INT,
name VARCHAR(50),
age INT,
count  BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY (id,name,age)
DISTRIBUTED BY HASH(id) buckets 10
PROPERTIES("replication_num" = "1");

复合分区表,第一级称为Partition,即分区。用户指定某一维度列做为分区列(当前只支持整型和时间类型的列),并指定每个分区的取值范围。第二级称为Distribution,即分桶。用户可以指定一个或多个维度列以及桶数进行HASH分布


CREATE TABLE student2
(
dt DATE,
id INT,
name VARCHAR(50),
age INT,
count  BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY (dt,id,name,age)
PARTITION BY RANGE(dt)
(
  PARTITION p202007 VALUES LESS THAN ('2020-08-01'),
  PARTITION p202008 VALUES LESS THAN ('2020-09-01'),
  PARTITION p202009 VALUES LESS THAN ('2020-10-01')
)
DISTRIBUTED BY HASH(id) buckets 10
PROPERTIES("replication_num" = "1");

数据模型

AGGREGATE KEY

AGGREGATE KEY相同时,新旧记录将会进行聚合操作

AGGREGATE KEY模型可以提前聚合数据,适合报表和多维度业务

UNIQUE KEY

UNIQUE KEY相同时,新记录覆盖旧记录。目前UNIQUE KEY和AGGREGATE KEY的REPLACE聚合方法一致。适用于有更新需求的业务。

DUPLICATE KEY

只指定排序列,相同的行并不会合并。适用于数据无需提前聚合的分析业务

数据导入

为适配不同的数据导入需求,Doris系统提供5种不同的导入方式。每种导入方式支持不同的数据源,存在不同的方式(异步、同步)

Broker load

Broker load是一个导入的异步方式,支持的数据源取决于Broker进程支持的数据源

基本原理:用户在提交导入任务后,FE(Doris系统的元数据和调度节点)会生成相应的PLAN(导入执行计划,BE会执行导入计划将输入导入Doris中)并根据BE(Doris系统的计算和存储节点)的个数和文件的大小,将Plan分给多个BE执行,每个BE导入一部分数据。BE在执行过程中会从Broker拉取数据,在对数据转换之后导入系统。所有BE均完成导入,由FE最终决定是否导入是否成功。

测试导入HDFS数据到Doris

编写测试文件,上传到HDFS

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oeF1RTzO-1651116594021)(https://gitee.com/czshh0628/blog-images/raw/master/image-20220428095216188.png)]

创建doris表,测试导入

CREATE TABLE student
(
id INT,
name VARCHAR(50),
age INT,
count  BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY (id,name,age)
DISTRIBUTED BY HASH(id) buckets 10
PROPERTIES("replication_num" = "1");

编写diros导入sql,更多参数请看官网

LOAD LABEL test_db.label1
(
    DATA INFILE("hdfs://bigdata:8020/student")
    INTO TABLE student
    COLUMNS TERMINATED BY ","
    (id,name,age,count)
    SET
    (
        id=id,
        name=name,
        age=age,
        count=count
    )
)
WITH BROKER broker_name
(
    "username"="root"
)
PROPERTIES
(
    "timeout" = "3600"
);

查看doris导入状态

use test_db;
show load;

Doris大数据分析保姆级使用教程

查看数据导入是否成功

Doris大数据分析保姆级使用教程

Routine Load

例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能

从Kafka导入数据到Doris

创建kafka主题

kafka-topics.sh --zookeeper bigdata:2181 --create --replication-factor 1 --partitions 1 --topic test

启动kafka生产者生产数据

kafka-console-producer.sh --broker-list bigdata:9092 --topic test

数据格式
{"id":"4","name":"czsqhh","age":"18","count":"50"}

在doris中创建对应表

CREATE TABLE kafka_student
(
id INT,
name VARCHAR(50),
age INT,
count  BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY (id,name,age)
DISTRIBUTED BY HASH(id) buckets 10
PROPERTIES("replication_num" = "1");

Doris大数据分析保姆级使用教程

创建导入作业, desired_concurrent_number指定并行度

CREATE ROUTINE LOAD test_db.job1 on kafka_student
PROPERTIES
(
    "desired_concurrent_number"="1",
    "strict_mode"="false",
    "format"="json"
)
FROM KAFKA
(
    "kafka_broker_list"= "bigdata:9092",
    "kafka_topic" = "test",
    "property.group.id" = "test"
);

查看作业状态

SHOW ROUTINE LOAD;

Doris大数据分析保姆级使用教程

控制作业

STOP ROUTINE LOAD For jobxxx :停止作业

PAUSE ROUTINE LOAD For jobxxx:暂停作业

RESUME ROUTINE LOAD For jobxxx:重启作业

数据导出

Drois导出数据到HDFS

其他参数详见官网

EXPORT TABLE test_db.student
PARTITION (student)
TO "hdfs://bigdata:8020/doris/student/"
WITH BROKER broker_name
(
    "username" = "root"
);

Doris大数据分析保姆级使用教程

Doris代码操作

Spark

引入依赖

<dependencies>
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-core_2.12artifactId>
        <version>3.0.0version>
    dependency>
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-yarn_2.12artifactId>
        <version>3.0.0version>
    dependency>
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-sql_2.12artifactId>
        <version>3.0.0version>
    dependency>

    <dependency>
        <groupId>mysqlgroupId>
        <artifactId>mysql-connector-javaartifactId>
        <version>5.1.27version>
    dependency>
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-hive_2.12artifactId>
        <version>3.0.0version>
    dependency>

    <dependency>
        <groupId>org.apache.hivegroupId>
        <artifactId>hive-execartifactId>
        <version>1.2.1version>
    dependency>
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-streaming_2.12artifactId>
        <version>3.0.0version>
    dependency>
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-streaming-kafka-0-10_2.12artifactId>
        <version>3.0.0version>
    dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.coregroupId>
        <artifactId>jackson-coreartifactId>
        <version>2.10.1version>
    dependency>

    <dependency>
        <groupId>com.alibabagroupId>
        <artifactId>druidartifactId>
        <version>1.1.10version>
    dependency>
dependencies>

读取doris数据

object ReadDoris {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("testReadDoris").setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    val df = sparkSession.read.format("jdbc")
      .option("url", "jdbc:mysql://bigdata:9030/test_db")
      .option("user", "root")
      .option("password", "root")
      .option("dbtable", "student")
      .load()

    df.show()

    sparkSession.close();
  }

}

Doris大数据分析保姆级使用教程

Flink

引入依赖

<dependencies>
    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-javaartifactId>
        <version>1.14.3version>
    dependency>

    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-streaming-java_2.12artifactId>
        <version>1.14.3version>
    dependency>

    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-clients_2.12artifactId>
        <version>1.14.3version>
    dependency>

    <dependency>
        <groupId>org.projectlombokgroupId>
        <artifactId>lombokartifactId>
        <version>1.18.16version>
    dependency>

    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-connector-kafka_2.12artifactId>
        <version>1.14.3version>
    dependency>

    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-connector-elasticsearch7_2.12artifactId>
        <version>1.14.3version>
    dependency>

    <dependency>
        <groupId>org.apache.bahirgroupId>
        <artifactId>flink-connector-redis_2.12artifactId>
        <version>1.1-SNAPSHOTversion>
    dependency>

    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-table-api-java-bridge_2.12artifactId>
        <version>1.14.3version>
    dependency>

    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-table-planner_2.12artifactId>
        <version>1.14.3version>
    dependency>

    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-table-commonartifactId>
        <version>1.14.3version>
    dependency>

    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-csvartifactId>
        <version>1.14.3version>
    dependency>

    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-connector-jdbc_2.12artifactId>
        <version>1.14.3version>
    dependency>
    <dependency>
        <groupId>mysqlgroupId>
        <artifactId>mysql-connector-javaartifactId>
        <version>8.0.23version>
    dependency>

    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-connector-kafka_2.12artifactId>
        <version>1.14.3version>
    dependency>

dependencies>

读取数据

public static void main(String[] args) {
    EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
    TableEnvironment tEnv = TableEnvironment.create(settings);

    String sourceSql = "CREATE TABLE student (\n" +
            "id Integer,\n" +
            "name STRING,\n" +
            "age Integer\n" +
            ")WITH (\n" +
            "'connector'='jdbc',\n" +
            "'url' = 'jdbc:mysql://bigdata:9030/test_db',\n" +
            "'username'='root',\n" +
            "'password'='root',\n" +
            "'table-name'='student'\n" +
            ")";
    tEnv.executeSql(sourceSql);

    Table table = tEnv.sqlQuery("select * from student");
    table.execute().print();
}

Doris大数据分析保姆级使用教程

Original: https://blog.csdn.net/xiaoyixiao_/article/details/124446025
Author: 笑一笑0628
Title: Doris大数据分析保姆级使用教程

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/694134/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球