提到格里芬—Griffin,大家想到更多的是篮球明星或者战队名,但在大数据领域Apache Griffin(以下简称Griffin)可是数据质量领域响当当的一哥。先说一句:Griffin是大数据质量监控领域唯一的Apache项目,懂了吧。
在不重视数据质量的大数据发展时期,Griffin并不能引起重视,但是随着数据治理在很多企业的全面开展与落地,数据质量的问题开始引起重视。
还是那句话,商用版的解决方案暂时不在本文的讨论范围内,目前大数据流动公众号对于数据治理工具的研究还是在开源方向,希望通过开源+二次开发结合的方式找到适合自己公司的数据治理工具箱。在未来有靠谱的商用方案,我们也会保持关注~
本文将从数据质量,Griffin简介,Griffin架构,Griffin快速入门,Griffin批数据实战,Griffin流数据实战整合六个部分进行介绍,目的是带大家快速的入门数据质量管理工具的使用。
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:0fc0db19-56d9-4e4e-aa4d-ec7db3b43f25
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:84ec4435-28f7-44f7-9405-5e6c725aeca4
考虑到抄袭问题,Griffin后续的高阶技术文章可能会付费,也希望大家能尽早加入数据治理、Griffin等相关技术群,我会将最新的文章与资料实时同步。
一、数据质量
数据质量管理(Data Quality Management),是指对数据从计划、获取、存储、共享、维护、应用、消亡生命周期的每个阶段里可能引发的各类数据质量问题,进行识别、度量、监控、预警等一系列管理活动,并通过改善和提高组织的管理水平使得数据质量获得进一步提高。
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:22e244f8-858f-4f37-a252-c73f2403bec8
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:69f7ca2a-b0c7-4fd1-9988-5e8b33726aeb
为什么会有数据质量管理呢?
大数据时代数据的核心不是”大”,而在于”有价值”,而有价值的关键在于”质量”。但现实是,数据往往存在很多问题:
- 数据无法匹配
- 数据不可识别
- 时效性不强
- 数据不一致
- 。。。。
那么,解决数据质量要达到什么目标呢?
总结来说就是 可信和可用。
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:011636d4-83ee-445c-adfa-8176008d64ad
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:c52d59ba-4c99-41e4-a40d-e7428a0d9298
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:cb0bc19e-f894-4e3d-8aca-49abcef59ad7
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:2042b703-c144-4f50-8ab6-c1c179e17f2a
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:260ea037-303f-457e-9843-800a2871fba5
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:abfdb085-d9ef-42c6-9e7a-bbc6687c6ae8
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:de527ad2-191e-40bc-b8cd-78d124e48aeb
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:8f3a1884-ed42-4b73-b541-a751f8e13ce6
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:fc2200af-3683-43bc-badc-67c364d0a9eb
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:fdf6440a-fbd7-47b6-8f07-95ab12effcb5
数据质量管理工具成熟的并不多,所以本文就不做无用的对比了,我们直接进入正题:Apache Griffin。
二、Griffin简介
Griffin是一个开源的大数据数据质量解决方案,由eBay开源,它支持批处理和流模式两种数据质量检测方式,是一个基于Hadoop和Spark建立的数据质量服务平台 (DQSP)。它提供了一个全面的框架来处理不同的任务,例如定义数据质量模型、执行数据质量测量、自动化数据分析和验证,以及跨多个数据系统的统一数据质量可视化。
Griffin于2016年12月进入Apache孵化器,Apache软件基金会2018年12月12日正式宣布Apache Griffin毕业成为Apache顶级项目。
Griffin官网地址:https://griffin.apache.org/
Github地址:https://github.com/apache/griffin
在eBay的数据质量管理实践中,需要花费很长时间去修复数据质量的问题,不管是批处理还是流处理,解决数据质量问题的时间都是巨大的,由此一个统一的数据质量系统就应运而生了。
在官网的定义中,Apache Griffin也早就更新为了批和流(Batch and Streaming)数据质量解决方案。Apache Griffin已经在朝着数据质量的统一管理平台而努力了。
Griffin主要有如下的功能特点:
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:96da1807-5ffa-4af9-8110-d5828cd8dc8e
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:3f65534a-efb8-4b4f-b394-2ccda219ca7c
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:4bee3a09-c2e9-438e-8831-2ecfa2a8e1bf
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:5324322b-c3a4-4255-865f-236edf9b253a
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:97f27cc3-8317-4d3b-9165-b583d6d48820
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:04c8c61b-4158-4fb6-a9c6-45addeb9a3a1
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:d77d6a34-7972-4eae-8378-8e778d5c659e
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:639675df-af30-4d5d-9114-a5360dca1ae1
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:305ac3f2-a07b-4ddd-ab2a-1c2d9725b306
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:17e86592-9fda-4f7d-ac0e-fc94e639bc1a
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:8aeaac08-2b64-4c85-b0a1-495fa578d0a0
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:a60ae673-9e2f-47a1-bff8-62e75027a69e
* 可伸缩性:工作在大数据量的环境中,目前运行的数据量约1.2PB(eBay环境)。
* 自助服务:Griffin提供了一个简洁易用的用户界面,可以管理数据资产和数据质量规则;同时用户可以通过控制面板查看数据质量结果和自定义显示内容。
Apache Giffin目前的数据源包括HIVE, CUSTOM, AVRO, KAFKA。Mysql和其他关系型数据库的扩展根据需要进行扩展。
当然Giffin也不是万能的,目前Griffin还是有很多的问题的,选择也要慎重:
Griffin的社区并不太活跃,可以共同讨论的人不多。
目前最新版本还是0.6,可能会有一些问题。
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:4cf0a165-42c8-4839-aa90-37874fed9046
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:05b210b2-38d5-402a-b8b0-71154b52f9cb
三、Griffin架构
数据质量模块是大数据平台中必不可少的一个功能组件,以下Griffin作为一个开源的大数据数据质量解决方案,它支持批处理和流模式两种数据质量检测方式,可以从不同维度(比如离线任务执行完毕后检查源端和目标端的数据数量是否一致、源表的数据空值数量等)度量数据资产,从而提升数据的准确度、可信度。
在Griffin的架构中,主要分为Define、Measure和Analyze三个部分,如下图所示:
各部分的职责如下:
- Define:主要负责定义数据质量统计的维度,比如数据质量统计的时间跨度、统计的目标(源端和目标端的数据数量是否一致,数据源里某一字段的非空的数量、不重复值的数量、最大值、最小值、top5的值数量等)
- Measure:主要负责执行统计任务,生成统计结果
- Analyze:主要负责保存与展示统计结果
听起来有些晦涩,我们来看一下一个完整的Griffin任务的执行流程。
- 注册数据,把想要检测数据质量的数据源注册到griffin。
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:ab7bfeac-5c20-4a5a-9c6d-a0070829acb4[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:3b79d8be-c338-4d5c-8d08-3ec885a96b6a
- 配置定时任务提交spark集群,定时检查数据。
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:17adff88-1555-440e-8373-6380f3459a52[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:2f54cd22-2d26-4c6d-8259-a4ea3388eba6
Griffin 系统主要分为:数据收集处理层(Data Collection&Processing Layer)、后端服务层(Backend Service Layer)和用户界面(User Interface)
数据处理和存储层:
对于批量分析,数据质量模型将根据 hadoop 中的数据源计算 Spark 集群中的数据质量指标。
对于近实时分析,使用来自消息传递系统的数据,然后数据质量模型将基于 Spark 集群计算实时数据质量指标。对于数据存储,可以在后端使用Elasticsearch来满足前端请求。
Apache Griffin 服务:
项目有提供Restful 服务来完成 Apache Griffin 的所有功能,例如探索数据集、创建数据质量度量、发布指标、检索指标、添加订阅等。因此,开发人员可以基于这些 Web 开发自己的用户界面服务。
这种灵活性也让Griffin 得到了越来越多的应用。
四、Griffin快速入门
Griffin的最新版本为0.6.0,本文的安装部署也基于这个版本进行。
依赖准备
JDK (1.8 or later versions)
MySQL(version 5.6及以上)
Hadoop (2.6.0 or later)
Hive (version 2.x)
Spark (version 2.2.1)
Livy(livy-0.5.0-incubating)
ElasticSearch (5.0 or later versions)
大部分CDH已经自带,这里特别说一下Livy和ElasticSearch如何部署。
Livy是一个Spark的Rest服务器。
准备livy安装包。
- 将livy安装包解压到/opt/目录下
- 创建livy用户、log目录并将livy的home目录属主修改为livy:hadoop
useradd livy -g hadoop
mkdir /var/log/livy
mkdir /var/run/livy
chown livy:hadoop /var/log/livy
chown livy:hadoop /var/run/livy
chown -R livy:hadoop /opt/cloudera/apache-livy-0.6.0-incubating-bin/
3.进入livy home目录,在conf目录下创建livy.conf、livy-env.sh、spark-blacklist.conf配置文件
livy.conf、livy-env.sh、spark-blacklist.conf
4.修改配置文件livy.conf,添加如下内容
livy.spark.master = yarn
livy.spark.deployMode = cluster
livy.environment = production
livy.impersonation.enabled = true
livy.server.csrf_protection.enabled false
livy.server.port = 8998
livy.server.session.timeout = 3600000
livy.server.recovery.mode = recovery
livy.server.recovery.state-store=filesystem
livy.server.recovery.state-store.url=/tmp/livy
5.修改配置文件livy-env.sh,增加hadoop和Spark的配置信息,如下
export JAVA_HOME=/usr/java/jdk1.8.0_181
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export SPARK_CONF_DIR=/etc/spark2/conf
export SPARK_HOME=/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh6.3.2.p0.1041012/lib/spark2
export HADOOP_CONF_DIR=/etc/hadoop/conf
export LIVY_LOG_DIR=/var/log/livy
export LIVY_PID_DIR=/var/run/livy
export LIVY_SERVER_JAVA_OPTS="-Xmx2g"
6.修改配置文件spark-blacklist.conf
Configuration override / blacklist. Defines a list of properties that users are not allowed
to override when starting Spark sessions.
#
This file takes a list of property names (one per line). Empty lines and lines starting with "#"
are ignored.
#
Disallow overriding the master and the deploy mode.
spark.master
spark.submit.deployMode
Disallow overriding the location of Spark cached jars.
spark.yarn.jar
spark.yarn.jars
spark.yarn.archive
Don't allow users to override the RSC timeout.
livy.rsc.server.idle-timeout
- core-site.xml 的群集范围高级配置代码段(安全阀)”配置项增加如下内容
<property>
<name>hadoop.proxyuser.livy.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.livy.hosts</name>
<value>*</value>
</property>
8.在HDFS上创建livy的home目录
sudo -u hdfs hadoop fs -mkdir /user/livy
sudo -u hdfs hadoop fs -chown livy:supergroup /user/livy
9、启动livy服务
livy-server start
elasticsearch5安装,安装包也已下载在资料包中。
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.15.tar.gz
tar -zxvf elasticsearch-5.6.15
cd elasticsearch-5.6.15
sh ./bin/elasticsearch
配置准备
1、首先在mysql中初始化quartz数据库,这里需要用到脚本Init_quartz_mysql_innodb.sql。
脚本可以加griffin群,领取资料包下载。
mysql -u <username> -p <password> < Init_quartz_mysql_innodb.sql
</password></username>
2、Hadoop和Hive:
从Hadoop服务器拷贝配置文件到Livy服务器上,这里假设将配置文件放在/usr/data/conf目录下。
在Hadoop服务器上创建/home/spark_conf目录,并将Hive的配置文件hive-site.xml上传到该目录下:
#创建/home/spark_conf目录
hadoop fs -mkdir -p /home/spark_conf
#上传hive-site.xml
hadoop fs -put hive-site.xml /home/spark_conf/
3、设置环境变量:
#!/bin/bash
export JAVA_HOME=/data/jdk1.8.0_192
#spark目录
export SPARK_HOME=/usr/data/spark-2.1.1-bin-2.6.3
#livy命令目录
export LIVY_HOME=/usr/data/livy/bin
#hadoop配置文件目录
export HADOOP_CONF_DIR=/usr/data/conf
4、配置启动Livy
更新livy/conf下的livy.conf配置文件:
livy.server.host = 127.0.0.1
livy.spark.master = yarn
livy.spark.deployMode = cluster
livy.repl.enable-hive-context = true
启动livy:
livy-server start
5、Elasticsearch配置:
在ES里创建griffin索引:
curl -XPUT http://es:9200/griffin -d '
{
"aliases": {},
"mappings": {
"accuracy": {
"properties": {
"name": {
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
},
"type": "text"
},
"tmst": {
"type": "date"
}
}
}
},
"settings": {
"index": {
"number_of_replicas": "2",
"number_of_shards": "5"
}
}
}
'
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:7e9703a8-7930-4945-92fd-69813740863f
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:5c3f2977-ba6a-4f51-80c4-575136bbd196
Griffin的源码结构很清晰,主要包括griffin-doc、measure、service和ui四个模块,其中griffin-doc负责存放Griffin的文档,measure负责与spark交互,执行统计任务,service使用spring boot作为服务实现,负责给ui模块提供交互所需的restful api,保存统计任务,展示统计结果。
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:de58afcb-65cb-42d2-8fcc-a8626d6691a0
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:c9c7d001-8fd8-4028-ba6b-2926d3c409ed
application.properties:mysql,hive,es配置
quartz.properties
sparkProperties.json
配置文件修改好后,在idea里的terminal里执行如下maven命令进行编译打包:
mvn -Dmaven.test.skip=true clean install
命令执行完成后,会在service和measure模块的target目录下分别看到service-0.6.0.jar和measure-0.6.0.jar两个jar,将这两个jar分别拷贝到服务器目录下。
1、使用如下命令将measure-0.4.0.jar这个jar上传到HDFS的/griffin文件目录里:
#改变jar名称
mv measure-0.6.0.jar griffin-measure.jar
#上传griffin-measure.jar到HDFS文件目录里
hadoop fs -put measure-0.6.0.jar /griffin/
2、运行service-0.6.0.jar,启动Griffin管理后台:
nohup java -jar service-0.6.0.jar>service.out 2>&1 &
几秒钟后,我们可以访问Apache Griffin的默认UI(默认情况下,spring boot的端口是8080)。
http://IP:8080
部分结果展示界面如下:
五、Griffin批数据实战
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:b38b142f-32dc-419f-829a-58bc8b901919
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:6ed7b29d-0784-4329-b149-cef5710e0ff8
1、在hive里创建表demo_src和demo_tgt:
--create hive tables here. hql script
--Note: replace hdfs location with your own path
CREATE EXTERNAL TABLE demo_src
(
id
bigint,
age
int,
desc
string)
PARTITIONED BY (
dt
string,
hour
string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
LOCATION
'hdfs:///griffin/data/batch/demo_src';
--Note: replace hdfs location with your own path
CREATE EXTERNAL TABLE demo_tgt
(
id
bigint,
age
int,
desc
string)
PARTITIONED BY (
dt
string,
hour
string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
LOCATION
'hdfs:///griffin/data/batch/demo_tgt';
2、生成测试数据:
从http://griffin.apache.org/data/batch/地址下载所有文件到Hadoop服务器上,然后使用如下命令执行gen-hive-data.sh脚本:
nohup ./gen-hive-data.sh>gen.out 2>&1 &
注意观察gen.out日志文件,如果有错误,视情况进行调整。这里我的测试环境Hadoop和Hive安装在同一台服务器上,因此直接运行脚本。
3、通过UI界面创建统计任务
选择DataAssets
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:483474ad-f9d6-493d-991a-d71c969610fb
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:e9e43d1d-2fd6-429f-9ef4-966c39c2e155
点击Measures,创建度量页面
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:dcdf13a5-1347-4b51-b08d-69881a9fd85d
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:69cdf670-1d25-43c5-baa3-aa75b56b86ed
选择数据源
选择目标
将两者关联
设置一些参数
配置好提交
新增定时任务
用cron表达式建立任务
点击DQ Metrics,看到效果。
六、Griffin流数据实战
还会参考官网的例子。
示例流数据如下:
{"id": 1, "name": "Apple", "color": "red", "time": "2018-09-12_06:00:00"}
{"id": 2, "name": "Banana", "color": "yellow", "time": "2018-09-12_06:01:00"}
...
官方也提供了测试数据的脚本https://griffin.apache.org/data/streaming/(已存资料包)
通过脚本可以源源不断将数据写入Kafka
#!/bin/bash
#create topics
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic source
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic target
#every minute
set +e
while true
do
/opt/module/data/gen-data.sh
sleep 90
done
set -e
Flink部分就是简单接收Kafka数据,然后再发向下游,部分代码片段如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer010<string> kafkaconsumer =
new FlinkKafkaConsumer010<string>(inputTopic, new SimpleStringSchema(), properties);
DataStream<string> dataStream = env.addSource(kafkaconsumer);
DataStream<string> target = dataStream.add...//具体处理逻辑
target..addSink(new FlinkKafkaProducer010<string>(
"hadoop101:9092",
"target",
new SimpleStringSchema()
));
outMap.print();
env.execute();
</string></string></string></string></string>
配合env.json
{
"spark": {
"log.level": "WARN",
"checkpoint.dir": "hdfs:///griffin/checkpoint",
"batch.interval": "20s",
"process.interval": "1m",
"init.clear": true,
"config": {
"spark.default.parallelism": 4,
"spark.task.maxFailures": 5,
"spark.streaming.kafkaMaxRatePerPartition": 1000,
"spark.streaming.concurrentJobs": 4,
"spark.yarn.maxAppAttempts": 5,
"spark.yarn.am.attemptFailuresValidityInterval": "1h",
"spark.yarn.max.executor.failures": 120,
"spark.yarn.executor.failuresValidityInterval": "1h",
"spark.hadoop.fs.hdfs.impl.disable.cache": true
}
},
"sinks": [
{
"type": "console"
},
{
"type": "hdfs",
"config": {
"path": "hdfs:///griffin/persist"
}
},
{
"type": "elasticsearch",
"config": {
"method": "post",
"api": "http://es:9200/griffin/accuracy"
}
}
],
"griffin.checkpoint": [
{
"type": "zk",
"config": {
"hosts": "zk:2181",
"namespace": "griffin/infocache",
"lock.path": "lock",
"mode": "persist",
"init.clear": true,
"close.clear": false
}
}
]
}
dq.json
{
"name": "streaming_accu",
"process.type": "streaming",
"data.sources": [
{
"name": "src",
"baseline": true,
"connectors": [
{
"type": "kafka",
"version": "0.8",
"config": {
"kafka.config": {
"bootstrap.servers": "kafka:9092",
"group.id": "griffin",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"topics": "source",
"key.type": "java.lang.String",
"value.type": "java.lang.String"
},
"pre.proc": [
{
"dsl.type": "df-opr",
"rule": "from_json"
}
]
}
],
"checkpoint": {
"type": "json",
"file.path": "hdfs:///griffin/streaming/dump/source",
"info.path": "source",
"ready.time.interval": "10s",
"ready.time.delay": "0",
"time.range": ["-5m", "0"],
"updatable": true
}
}, {
"name": "tgt",
"connectors": [
{
"type": "kafka",
"version": "0.8",
"config": {
"kafka.config": {
"bootstrap.servers": "kafka:9092",
"group.id": "griffin",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"topics": "target",
"key.type": "java.lang.String",
"value.type": "java.lang.String"
},
"pre.proc": [
{
"dsl.type": "df-opr",
"rule": "from_json"
}
]
}
],
"checkpoint": {
"type": "json",
"file.path": "hdfs:///griffin/streaming/dump/target",
"info.path": "target",
"ready.time.interval": "10s",
"ready.time.delay": "0",
"time.range": ["-1m", "0"]
}
}
],
"evaluate.rule": {
"rules": [
{
"dsl.type": "griffin-dsl",
"dq.type": "accuracy",
"out.dataframe.name": "accu",
"rule": "src.id = tgt.id AND src.name = tgt.name AND src.color = tgt.color AND src.time = tgt.time",
"details": {
"source": "src",
"target": "tgt",
"miss": "miss_count",
"total": "total_count",
"matched": "matched_count"
},
"out":[
{
"type":"metric",
"name": "accu"
},
{
"type":"record",
"name": "missRecords"
}
]
}
]
},
"sinks": ["CONSOLE", "HDFS"]
}
提交任务
spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default \
--driver-memory 1g --executor-memory 1g --num-executors 3 \
<path>/griffin-measure.jar \
<path>/env.json <path>/dq.json
</path></path></path>
七、总结
数据管理工具目前来说还是非常匮乏的,Griffin提供的不仅仅是实现,还有数据质量管理的思路,这对于我们自研数据质量管理系统也是非常的宝贵的。
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:2fe14b50-8b9b-4f03-9d41-0b4a2d800ca1
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:ed78e49d-5512-4341-be8d-03541cc36f45
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:22d74dcc-ab13-4094-9d44-d943d7dfda27
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:14dbd2c1-61e1-452e-b3e9-ea4d890e5ef9
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:4e78b4a5-6a7b-4abb-9578-719c0d6524c9
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:1ac20284-9ac2-4809-bca4-06c8c7199da9
另外 数据治理工具箱 知识星球也已成立,这是一个数据治理落地实践方向的知识星球。大数据流动发布的数据治理相关文章与资料(包括付费内容)都将在知识星球进行 长期同步。星球的目标是收集数据治理实践工具的相关资料,并定期组织实战学习小组,让数据治理的相关资料可以长久的保存,同时也解决文章被频繁抄袭的问题,欢迎大家加入。
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:4bf0a525-275e-47d2-8ed3-3d2123871c29
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:d03b3557-acd7-475a-affb-b72a6c4a8e30
Original: https://www.cnblogs.com/tree1123/p/16481069.html
Author: 独孤风
Title: 开源数据质量解决方案——Apache Griffin入门宝典
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/562167/
转载文章受原作者版权保护。转载请注明原作者出处!