开源数据质量解决方案——Apache Griffin入门宝典

提到格里芬—Griffin,大家想到更多的是篮球明星或者战队名,但在大数据领域Apache Griffin(以下简称Griffin)可是数据质量领域响当当的一哥。先说一句:Griffin是大数据质量监控领域唯一的Apache项目,懂了吧。

​ 在不重视数据质量的大数据发展时期,Griffin并不能引起重视,但是随着数据治理在很多企业的全面开展与落地,数据质量的问题开始引起重视。

​ 还是那句话,商用版的解决方案暂时不在本文的讨论范围内,目前大数据流动公众号对于数据治理工具的研究还是在开源方向,希望通过开源+二次开发结合的方式找到适合自己公司的数据治理工具箱。在未来有靠谱的商用方案,我们也会保持关注~

开源数据质量解决方案——Apache Griffin入门宝典

本文将从数据质量,Griffin简介,Griffin架构,Griffin快速入门,Griffin批数据实战,Griffin流数据实战整合六个部分进行介绍,目的是带大家快速的入门数据质量管理工具的使用。

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:0fc0db19-56d9-4e4e-aa4d-ec7db3b43f25

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:84ec4435-28f7-44f7-9405-5e6c725aeca4

考虑到抄袭问题,Griffin后续的高阶技术文章可能会付费,也希望大家能尽早加入数据治理、Griffin等相关技术群,我会将最新的文章与资料实时同步。

开源数据质量解决方案——Apache Griffin入门宝典

一、数据质量

​ 数据质量管理(Data Quality Management),是指对数据从计划、获取、存储、共享、维护、应用、消亡生命周期的每个阶段里可能引发的各类数据质量问题,进行识别、度量、监控、预警等一系列管理活动,并通过改善和提高组织的管理水平使得数据质量获得进一步提高。

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:22e244f8-858f-4f37-a252-c73f2403bec8

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:69f7ca2a-b0c7-4fd1-9988-5e8b33726aeb

​ 为什么会有数据质量管理呢?

​ 大数据时代数据的核心不是”大”,而在于”有价值”,而有价值的关键在于”质量”。但现实是,数据往往存在很多问题:

  • 数据无法匹配
  • 数据不可识别
  • 时效性不强
  • 数据不一致
  • 。。。。

​ 那么,解决数据质量要达到什么目标呢?

总结来说就是 可信和可用

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:011636d4-83ee-445c-adfa-8176008d64ad

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:c52d59ba-4c99-41e4-a40d-e7428a0d9298

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:cb0bc19e-f894-4e3d-8aca-49abcef59ad7

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:2042b703-c144-4f50-8ab6-c1c179e17f2a

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:260ea037-303f-457e-9843-800a2871fba5

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:abfdb085-d9ef-42c6-9e7a-bbc6687c6ae8

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:de527ad2-191e-40bc-b8cd-78d124e48aeb

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:8f3a1884-ed42-4b73-b541-a751f8e13ce6

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:fc2200af-3683-43bc-badc-67c364d0a9eb

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:fdf6440a-fbd7-47b6-8f07-95ab12effcb5

数据质量管理工具成熟的并不多,所以本文就不做无用的对比了,我们直接进入正题:Apache Griffin。

二、Griffin简介

​ Griffin是一个开源的大数据数据质量解决方案,由eBay开源,它支持批处理和流模式两种数据质量检测方式,是一个基于Hadoop和Spark建立的数据质量服务平台 (DQSP)。它提供了一个全面的框架来处理不同的任务,例如定义数据质量模型、执行数据质量测量、自动化数据分析和验证,以及跨多个数据系统的统一数据质量可视化。

Griffin于2016年12月进入Apache孵化器,Apache软件基金会2018年12月12日正式宣布Apache Griffin毕业成为Apache顶级项目。

Griffin官网地址:https://griffin.apache.org/

Github地址:https://github.com/apache/griffin

在eBay的数据质量管理实践中,需要花费很长时间去修复数据质量的问题,不管是批处理还是流处理,解决数据质量问题的时间都是巨大的,由此一个统一的数据质量系统就应运而生了。

在官网的定义中,Apache Griffin也早就更新为了批和流(Batch and Streaming)数据质量解决方案。Apache Griffin已经在朝着数据质量的统一管理平台而努力了。

开源数据质量解决方案——Apache Griffin入门宝典

Griffin主要有如下的功能特点:

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:96da1807-5ffa-4af9-8110-d5828cd8dc8e

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:3f65534a-efb8-4b4f-b394-2ccda219ca7c

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:4bee3a09-c2e9-438e-8831-2ecfa2a8e1bf

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:5324322b-c3a4-4255-865f-236edf9b253a

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:97f27cc3-8317-4d3b-9165-b583d6d48820

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:04c8c61b-4158-4fb6-a9c6-45addeb9a3a1

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:d77d6a34-7972-4eae-8378-8e778d5c659e

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:639675df-af30-4d5d-9114-a5360dca1ae1

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:305ac3f2-a07b-4ddd-ab2a-1c2d9725b306

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:17e86592-9fda-4f7d-ac0e-fc94e639bc1a

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:8aeaac08-2b64-4c85-b0a1-495fa578d0a0

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:a60ae673-9e2f-47a1-bff8-62e75027a69e

* 可伸缩性:工作在大数据量的环境中,目前运行的数据量约1.2PB(eBay环境)。
* 自助服务:Griffin提供了一个简洁易用的用户界面,可以管理数据资产和数据质量规则;同时用户可以通过控制面板查看数据质量结果和自定义显示内容。

Apache Giffin目前的数据源包括HIVE, CUSTOM, AVRO, KAFKA。Mysql和其他关系型数据库的扩展根据需要进行扩展。

当然Giffin也不是万能的,目前Griffin还是有很多的问题的,选择也要慎重:

Griffin的社区并不太活跃,可以共同讨论的人不多。

目前最新版本还是0.6,可能会有一些问题。

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:4cf0a165-42c8-4839-aa90-37874fed9046

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:05b210b2-38d5-402a-b8b0-71154b52f9cb

三、Griffin架构

​ 数据质量模块是大数据平台中必不可少的一个功能组件,以下Griffin作为一个开源的大数据数据质量解决方案,它支持批处理和流模式两种数据质量检测方式,可以从不同维度(比如离线任务执行完毕后检查源端和目标端的数据数量是否一致、源表的数据空值数量等)度量数据资产,从而提升数据的准确度、可信度。

在Griffin的架构中,主要分为Define、Measure和Analyze三个部分,如下图所示:

开源数据质量解决方案——Apache Griffin入门宝典

各部分的职责如下:

  • Define:主要负责定义数据质量统计的维度,比如数据质量统计的时间跨度、统计的目标(源端和目标端的数据数量是否一致,数据源里某一字段的非空的数量、不重复值的数量、最大值、最小值、top5的值数量等)
  • Measure:主要负责执行统计任务,生成统计结果
  • Analyze:主要负责保存与展示统计结果

听起来有些晦涩,我们来看一下一个完整的Griffin任务的执行流程。

开源数据质量解决方案——Apache Griffin入门宝典
  • 注册数据,把想要检测数据质量的数据源注册到griffin。
    [TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:ab7bfeac-5c20-4a5a-9c6d-a0070829acb4
    [En]

    [TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:3b79d8be-c338-4d5c-8d08-3ec885a96b6a

  • 配置定时任务提交spark集群,定时检查数据。
    [TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:17adff88-1555-440e-8373-6380f3459a52
    [En]

    [TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:2f54cd22-2d26-4c6d-8259-a4ea3388eba6

Griffin 系统主要分为:数据收集处理层(Data Collection&Processing Layer)、后端服务层(Backend Service Layer)和用户界面(User Interface)

开源数据质量解决方案——Apache Griffin入门宝典

数据处理和存储层:

对于批量分析,数据质量模型将根据 hadoop 中的数据源计算 Spark 集群中的数据质量指标。

对于近实时分析,使用来自消息传递系统的数据,然后数据质量模型将基于 Spark 集群计算实时数据质量指标。对于数据存储,可以在后端使用Elasticsearch来满足前端请求。

Apache Griffin 服务:

项目有提供Restful 服务来完成 Apache Griffin 的所有功能,例如探索数据集、创建数据质量度量、发布指标、检索指标、添加订阅等。因此,开发人员可以基于这些 Web 开发自己的用户界面服务。

这种灵活性也让Griffin 得到了越来越多的应用。

四、Griffin快速入门

Griffin的最新版本为0.6.0,本文的安装部署也基于这个版本进行。

开源数据质量解决方案——Apache Griffin入门宝典

依赖准备

JDK (1.8 or later versions)
MySQL(version 5.6及以上)
Hadoop (2.6.0 or later)
Hive (version 2.x)
Spark (version 2.2.1)
Livy(livy-0.5.0-incubating)
ElasticSearch (5.0 or later versions)

大部分CDH已经自带,这里特别说一下Livy和ElasticSearch如何部署。

Livy是一个Spark的Rest服务器。

https://livy.apache.org/

准备livy安装包。

  1. 将livy安装包解压到/opt/目录下
  2. 创建livy用户、log目录并将livy的home目录属主修改为livy:hadoop
useradd livy -g hadoop
mkdir /var/log/livy
mkdir /var/run/livy
chown livy:hadoop /var/log/livy
chown livy:hadoop /var/run/livy
chown -R livy:hadoop /opt/cloudera/apache-livy-0.6.0-incubating-bin/

3.进入livy home目录,在conf目录下创建livy.conf、livy-env.sh、spark-blacklist.conf配置文件

livy.conf、livy-env.sh、spark-blacklist.conf

4.修改配置文件livy.conf,添加如下内容

livy.spark.master = yarn
livy.spark.deployMode = cluster
livy.environment = production
livy.impersonation.enabled = true
livy.server.csrf_protection.enabled false
livy.server.port = 8998
livy.server.session.timeout = 3600000
livy.server.recovery.mode = recovery
livy.server.recovery.state-store=filesystem
livy.server.recovery.state-store.url=/tmp/livy

5.修改配置文件livy-env.sh,增加hadoop和Spark的配置信息,如下

export JAVA_HOME=/usr/java/jdk1.8.0_181
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export SPARK_CONF_DIR=/etc/spark2/conf
export SPARK_HOME=/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh6.3.2.p0.1041012/lib/spark2
export HADOOP_CONF_DIR=/etc/hadoop/conf
export LIVY_LOG_DIR=/var/log/livy
export LIVY_PID_DIR=/var/run/livy
export LIVY_SERVER_JAVA_OPTS="-Xmx2g"

6.修改配置文件spark-blacklist.conf

Configuration override / blacklist. Defines a list of properties that users are not allowed
to override when starting Spark sessions.

#
This file takes a list of property names (one per line). Empty lines and lines starting with "#"
are ignored.

#
Disallow overriding the master and the deploy mode.

spark.master
spark.submit.deployMode
Disallow overriding the location of Spark cached jars.

spark.yarn.jar
spark.yarn.jars
spark.yarn.archive
Don't allow users to override the RSC timeout.

livy.rsc.server.idle-timeout
  1. core-site.xml 的群集范围高级配置代码段(安全阀)”配置项增加如下内容
<property>
    <name>hadoop.proxyuser.livy.groups</name>
    <value>*</value>
</property>
<property>
    <name>hadoop.proxyuser.livy.hosts</name>
    <value>*</value>
</property>

8.在HDFS上创建livy的home目录

sudo -u hdfs hadoop fs -mkdir /user/livy
sudo -u hdfs hadoop fs -chown livy:supergroup /user/livy

9、启动livy服务

livy-server start

elasticsearch5安装,安装包也已下载在资料包中。

 wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.15.tar.gz

tar -zxvf elasticsearch-5.6.15

cd elasticsearch-5.6.15

sh ./bin/elasticsearch

配置准备

1、首先在mysql中初始化quartz数据库,这里需要用到脚本Init_quartz_mysql_innodb.sql。

脚本可以加griffin群,领取资料包下载。

mysql -u <username> -p <password> < Init_quartz_mysql_innodb.sql
</password></username>

2、Hadoop和Hive:

从Hadoop服务器拷贝配置文件到Livy服务器上,这里假设将配置文件放在/usr/data/conf目录下。

在Hadoop服务器上创建/home/spark_conf目录,并将Hive的配置文件hive-site.xml上传到该目录下:

#&#x521B;&#x5EFA;/home/spark_conf&#x76EE;&#x5F55;
hadoop fs -mkdir -p /home/spark_conf
#&#x4E0A;&#x4F20;hive-site.xml
hadoop fs -put hive-site.xml /home/spark_conf/

3、设置环境变量:

#!/bin/bash
export JAVA_HOME=/data/jdk1.8.0_192

#spark&#x76EE;&#x5F55;
export SPARK_HOME=/usr/data/spark-2.1.1-bin-2.6.3
#livy&#x547D;&#x4EE4;&#x76EE;&#x5F55;
export LIVY_HOME=/usr/data/livy/bin
#hadoop&#x914D;&#x7F6E;&#x6587;&#x4EF6;&#x76EE;&#x5F55;
export HADOOP_CONF_DIR=/usr/data/conf

4、配置启动Livy

更新livy/conf下的livy.conf配置文件:

livy.server.host = 127.0.0.1
livy.spark.master = yarn
livy.spark.deployMode = cluster
livy.repl.enable-hive-context = true

启动livy:

livy-server start

5、Elasticsearch配置:

在ES里创建griffin索引:

curl -XPUT http://es:9200/griffin -d '
{
    "aliases": {},
    "mappings": {
        "accuracy": {
            "properties": {
                "name": {
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    },
                    "type": "text"
                },
                "tmst": {
                    "type": "date"
                }
            }
        }
    },
    "settings": {
        "index": {
            "number_of_replicas": "2",
            "number_of_shards": "5"
        }
    }
}
'

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:7e9703a8-7930-4945-92fd-69813740863f

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:5c3f2977-ba6a-4f51-80c4-575136bbd196

Griffin的源码结构很清晰,主要包括griffin-doc、measure、service和ui四个模块,其中griffin-doc负责存放Griffin的文档,measure负责与spark交互,执行统计任务,service使用spring boot作为服务实现,负责给ui模块提供交互所需的restful api,保存统计任务,展示统计结果。

开源数据质量解决方案——Apache Griffin入门宝典

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:de58afcb-65cb-42d2-8fcc-a8626d6691a0

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:c9c7d001-8fd8-4028-ba6b-2926d3c409ed

application.properties:mysql,hive,es配置

quartz.properties

sparkProperties.json

配置文件修改好后,在idea里的terminal里执行如下maven命令进行编译打包:

mvn -Dmaven.test.skip=true clean install

命令执行完成后,会在service和measure模块的target目录下分别看到service-0.6.0.jar和measure-0.6.0.jar两个jar,将这两个jar分别拷贝到服务器目录下。

1、使用如下命令将measure-0.4.0.jar这个jar上传到HDFS的/griffin文件目录里:

#&#x6539;&#x53D8;jar&#x540D;&#x79F0;
mv measure-0.6.0.jar griffin-measure.jar
#&#x4E0A;&#x4F20;griffin-measure.jar&#x5230;HDFS&#x6587;&#x4EF6;&#x76EE;&#x5F55;&#x91CC;
hadoop fs -put measure-0.6.0.jar /griffin/

2、运行service-0.6.0.jar,启动Griffin管理后台:

nohup java -jar service-0.6.0.jar>service.out 2>&1 &

几秒钟后,我们可以访问Apache Griffin的默认UI(默认情况下,spring boot的端口是8080)。

http://IP:8080

部分结果展示界面如下:

开源数据质量解决方案——Apache Griffin入门宝典

五、Griffin批数据实战

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:b38b142f-32dc-419f-829a-58bc8b901919

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:6ed7b29d-0784-4329-b149-cef5710e0ff8

1、在hive里创建表demo_src和demo_tgt:

--create hive tables here. hql script
--Note: replace hdfs location with your own path
CREATE EXTERNAL TABLE demo_src(
  id bigint,
  age int,
  desc string)
PARTITIONED BY (
  dt string,
  hour string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '|'
LOCATION
  'hdfs:///griffin/data/batch/demo_src';

--Note: replace hdfs location with your own path
CREATE EXTERNAL TABLE demo_tgt(
  id bigint,
  age int,
  desc string)
PARTITIONED BY (
  dt string,
  hour string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '|'
LOCATION
  'hdfs:///griffin/data/batch/demo_tgt';

2、生成测试数据:

从http://griffin.apache.org/data/batch/地址下载所有文件到Hadoop服务器上,然后使用如下命令执行gen-hive-data.sh脚本:

nohup ./gen-hive-data.sh>gen.out 2>&1 &

注意观察gen.out日志文件,如果有错误,视情况进行调整。这里我的测试环境Hadoop和Hive安装在同一台服务器上,因此直接运行脚本。

3、通过UI界面创建统计任务

选择DataAssets

开源数据质量解决方案——Apache Griffin入门宝典

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:483474ad-f9d6-493d-991a-d71c969610fb

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:e9e43d1d-2fd6-429f-9ef4-966c39c2e155

开源数据质量解决方案——Apache Griffin入门宝典

点击Measures,创建度量页面

开源数据质量解决方案——Apache Griffin入门宝典

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:dcdf13a5-1347-4b51-b08d-69881a9fd85d

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:69cdf670-1d25-43c5-baa3-aa75b56b86ed

开源数据质量解决方案——Apache Griffin入门宝典

选择数据源

开源数据质量解决方案——Apache Griffin入门宝典

选择目标

开源数据质量解决方案——Apache Griffin入门宝典

将两者关联

开源数据质量解决方案——Apache Griffin入门宝典

设置一些参数

开源数据质量解决方案——Apache Griffin入门宝典

配置好提交

开源数据质量解决方案——Apache Griffin入门宝典

新增定时任务

开源数据质量解决方案——Apache Griffin入门宝典

用cron表达式建立任务

开源数据质量解决方案——Apache Griffin入门宝典

点击DQ Metrics,看到效果。

开源数据质量解决方案——Apache Griffin入门宝典

六、Griffin流数据实战

还会参考官网的例子。

示例流数据如下:

{"id": 1, "name": "Apple", "color": "red", "time": "2018-09-12_06:00:00"}
{"id": 2, "name": "Banana", "color": "yellow", "time": "2018-09-12_06:01:00"}
...

官方也提供了测试数据的脚本https://griffin.apache.org/data/streaming/(已存资料包)

通过脚本可以源源不断将数据写入Kafka

#!/bin/bash

#create topics
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic source
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic target

#every minute
set +e
while true
do
  /opt/module/data/gen-data.sh
  sleep 90
done
set -e

Flink部分就是简单接收Kafka数据,然后再发向下游,部分代码片段如下:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer010<string> kafkaconsumer =
                new FlinkKafkaConsumer010<string>(inputTopic, new SimpleStringSchema(), properties);
        DataStream<string> dataStream = env.addSource(kafkaconsumer);

DataStream<string> target = dataStream.add...//&#x5177;&#x4F53;&#x5904;&#x7406;&#x903B;&#x8F91;

target..addSink(new FlinkKafkaProducer010<string>(
                "hadoop101:9092",
                "target",
                new SimpleStringSchema()
        ));
        outMap.print();
        env.execute();
</string></string></string></string></string>

配合env.json

{
  "spark": {
    "log.level": "WARN",
    "checkpoint.dir": "hdfs:///griffin/checkpoint",
    "batch.interval": "20s",
    "process.interval": "1m",
    "init.clear": true,
    "config": {
      "spark.default.parallelism": 4,
      "spark.task.maxFailures": 5,
      "spark.streaming.kafkaMaxRatePerPartition": 1000,
      "spark.streaming.concurrentJobs": 4,
      "spark.yarn.maxAppAttempts": 5,
      "spark.yarn.am.attemptFailuresValidityInterval": "1h",
      "spark.yarn.max.executor.failures": 120,
      "spark.yarn.executor.failuresValidityInterval": "1h",
      "spark.hadoop.fs.hdfs.impl.disable.cache": true
    }
  },
  "sinks": [
    {
      "type": "console"
    },
    {
      "type": "hdfs",
      "config": {
        "path": "hdfs:///griffin/persist"
      }
    },
    {
      "type": "elasticsearch",
      "config": {
        "method": "post",
        "api": "http://es:9200/griffin/accuracy"
      }
    }
  ],
  "griffin.checkpoint": [
    {
      "type": "zk",
      "config": {
        "hosts": "zk:2181",
        "namespace": "griffin/infocache",
        "lock.path": "lock",
        "mode": "persist",
        "init.clear": true,
        "close.clear": false
      }
    }
  ]
}

dq.json

{
  "name": "streaming_accu",
  "process.type": "streaming",
  "data.sources": [
    {
      "name": "src",
      "baseline": true,
      "connectors": [
        {
          "type": "kafka",
          "version": "0.8",
          "config": {
            "kafka.config": {
              "bootstrap.servers": "kafka:9092",
              "group.id": "griffin",
              "auto.offset.reset": "largest",
              "auto.commit.enable": "false"
            },
            "topics": "source",
            "key.type": "java.lang.String",
            "value.type": "java.lang.String"
          },
          "pre.proc": [
            {
              "dsl.type": "df-opr",
              "rule": "from_json"
            }
          ]
        }
      ],
      "checkpoint": {
        "type": "json",
        "file.path": "hdfs:///griffin/streaming/dump/source",
        "info.path": "source",
        "ready.time.interval": "10s",
        "ready.time.delay": "0",
        "time.range": ["-5m", "0"],
        "updatable": true
      }
    }, {
      "name": "tgt",
      "connectors": [
        {
          "type": "kafka",
          "version": "0.8",
          "config": {
            "kafka.config": {
              "bootstrap.servers": "kafka:9092",
              "group.id": "griffin",
              "auto.offset.reset": "largest",
              "auto.commit.enable": "false"
            },
            "topics": "target",
            "key.type": "java.lang.String",
            "value.type": "java.lang.String"
          },
          "pre.proc": [
            {
              "dsl.type": "df-opr",
              "rule": "from_json"
            }
          ]
        }
      ],
      "checkpoint": {
        "type": "json",
        "file.path": "hdfs:///griffin/streaming/dump/target",
        "info.path": "target",
        "ready.time.interval": "10s",
        "ready.time.delay": "0",
        "time.range": ["-1m", "0"]
      }
    }
  ],
  "evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "accuracy",
        "out.dataframe.name": "accu",
        "rule": "src.id = tgt.id AND src.name = tgt.name AND src.color = tgt.color AND src.time = tgt.time",
        "details": {
          "source": "src",
          "target": "tgt",
          "miss": "miss_count",
          "total": "total_count",
          "matched": "matched_count"
        },
        "out":[
          {
            "type":"metric",
            "name": "accu"
          },
          {
            "type":"record",
            "name": "missRecords"
          }
        ]
      }
    ]
  },
  "sinks": ["CONSOLE", "HDFS"]
}

提交任务

spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default \
--driver-memory 1g --executor-memory 1g --num-executors 3 \
<path>/griffin-measure.jar \
<path>/env.json <path>/dq.json
</path></path></path>

七、总结

数据管理工具目前来说还是非常匮乏的,Griffin提供的不仅仅是实现,还有数据质量管理的思路,这对于我们自研数据质量管理系统也是非常的宝贵的。

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:2fe14b50-8b9b-4f03-9d41-0b4a2d800ca1

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:ed78e49d-5512-4341-be8d-03541cc36f45

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:22d74dcc-ab13-4094-9d44-d943d7dfda27

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:14dbd2c1-61e1-452e-b3e9-ea4d890e5ef9

开源数据质量解决方案——Apache Griffin入门宝典

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:4e78b4a5-6a7b-4abb-9578-719c0d6524c9

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:1ac20284-9ac2-4809-bca4-06c8c7199da9

开源数据质量解决方案——Apache Griffin入门宝典

另外 数据治理工具箱 知识星球也已成立,这是一个数据治理落地实践方向的知识星球。大数据流动发布的数据治理相关文章与资料(包括付费内容)都将在知识星球进行 长期同步。星球的目标是收集数据治理实践工具的相关资料,并定期组织实战学习小组,让数据治理的相关资料可以长久的保存,同时也解决文章被频繁抄袭的问题,欢迎大家加入。

开源数据质量解决方案——Apache Griffin入门宝典

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:4bf0a525-275e-47d2-8ed3-3d2123871c29

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:d03b3557-acd7-475a-affb-b72a6c4a8e30

Original: https://www.cnblogs.com/tree1123/p/16481069.html
Author: 独孤风
Title: 开源数据质量解决方案——Apache Griffin入门宝典

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/562167/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球