一、数据采集及传输
涉及知识点:SpringBoot、Tomcat、埋点、JSON日志、Flume(agent、source、channel、sink、自定义拦截器、多路复用选择器)、Kafka(topic、partition分区、replication副本)
1、创建topic_start和topic_event,用于存放启动日志和事件日志
kafka-topics.sh --zookeeper hadoop10:2181 --topic topic_start \
--create --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper hadoop10:2181 --topic topic_event \
--create --partitions 1 --replication-factor 1
2、自定义flume拦截器,对数据进行处理(过滤),发布到kafka对应的分区
package com.yh.interceptor;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class MallInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//event对象中有两个属性,byte[]类型的body-存储采集到的数据
//Map类型的headers-存储key-value键值对数据,默认里面为空
//Event: { headers:{} body: 61 61 61 aaa }
byte[] body = event.getBody(); //获取event对象中body的属性值
String str = new String(body, Charset.forName("UTF-8")); //字节数组转换为字符串
if(StringUtils.isBlank(str)){ //如果字符串不包含(null,空串),isBlank返回true
return null; //有问题的话不返回event,返回null
}
boolean flag = false;
Map<string, string> map = event.getHeaders(); //获取event对象中header的属性值
if(str.contains("\"en\":\"start\"")){
map.put("topic","topic_start");
flag=ETLUtilss.vaildStart(str);
} else {
map.put("topic","topic_event");
flag=ETLUtilss.vaildEvent(str);
}
if(!flag){
return null;
}
return event;
}
@Override
public List<event> intercept(List<event> list) {
List list2 = new ArrayList(); //用于存储非空的event
for (Event event : list) {
Event event1 = intercept(event);
if(event1 != null){
list2.add(event1);
}
}
return null;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new MallInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
package com.yh;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONObject;
import java.util.ArrayList;
import java.util.List;
public class MallUDTF extends GenericUDTF {
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
//1、创建list集合,存储炸裂后形成的列名
List<string> fieldNames = new ArrayList();
fieldNames.add("event_name");
fieldNames.add("event_json");
//创建list集合,存储炸裂后形成列的数据类型
List<objectinspector> fieldOIs = new ArrayList<>();
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
List list = new ArrayList();
@Override
public void process(Object[] args) throws HiveException {
// "et": [ {},{} ] malludtf(malludf(line,"et"))
String s = args[0].toString();
JSONArray jsonArray = new JSONArray(s);
for(int i=0; i<jsonarray.length(); i++){ list.clear(); string[] ret="new" string[2]; jsonobject string en="jsonObject.getString("en");" json="jsonObject.toString();" list.add(en); list.add(json); forward(list); system.out.println(list); } @override public void close() throws hiveexception { static main(string[] args) malludtf malludtf(); s="[{\" ett\":\"1650336734810\",\"en\":\"loading\",\"kv\":{\"extend2\":\"\",\"loading_time\":\"0\",\"action\":\"3\",\"extend1\":\"\",\"type\":\"3\",\"type1\":\"\",\"loading_way\":\"1\"}},\n" + " {\"ett\":\"1650375109091\",\"en\":\"active_background\",\"kv\":{\"active_source\":\"3\"}},\n" {\"ett\":\"1650310705805\",\"en\":\"comment\",\"kv\":{\"p_comment_id\":4,\"addtime\":\"1650328750056\",\"praise_count\":796,\"other_id\":3,\"comment_id\":6,\"reply_count\":52,\"userid\":1,\"content\":\"姓谭帅末操料液故\"}},\n" {\"ett\":\"1650340480983\",\"en\":\"favorites\",\"kv\":{\"course_id\":6,\"id\":0,\"add_time\":\"1650340150151\",\"userid\":5}}]"; malludtf.process(new object[]{s}); < code></jsonarray.length();></objectinspector></string>
3、永久函数jar包需上传至hdfs(临时的在本地),创建函数与java class关联
--创建永久函数与开发好的java class关联,如果失败,需要drop function xx,删除jar包,重新操作
create function base_analizer as 'com.yh.MallUDF' using jar
'hdfs://hadoop10:9000/user/hive/jars/mall-udtf-1.0-SNAPSHOT.jar';
create function flat_analizer as 'com.yh.MallUDTF' using jar
'hdfs://hadoop10:9000/user/hive/jars/mall-udtf-1.0-SNAPSHOT.jar';
4、利用自定义函数向dwd_base_event_log插入数据
insert into table dwd_base_event_log partition(dt='2022-04-01')
select
base_analizer(line,"mid") as mid_id,
base_analizer(line,"uid") as user_id,
base_analizer(line,'vc') as version_code,
base_analizer(line,'vn') as version_name,
base_analizer(line,'l') as lang,
base_analizer(line,'sr') as source,
base_analizer(line,'os') as os,
base_analizer(line,'ar') as area,
base_analizer(line,'md') as model,
base_analizer(line,'ba') as brand,
base_analizer(line,'sv') as sdk_version,
base_analizer(line,'g') as gmail,
base_analizer(line,'hw') as height_width,
base_analizer(line,'t') as app_time,
base_analizer(line,'nw') as network,
base_analizer(line,'ln') as lng,
base_analizer(line,'la') as lat,
event_name,
event_json,
base_analizer(line,'st') as server_time
from ods_event_log
lateral view flat_analizer(base_analizer(line,'et')) tt
as event_name,event_json where dt='2022-04-01'
and base_analizer(line,'et')<>'';
5、修改hive-site.xml文件,将hive底层引擎由mr更换为spark
<property>
<name>spark.yarn.jars</name>
<value>hdfs://hadoop10:9000/spark-jars/*</value>
</property>
<property>
<name>hive.execution.engine</name>
<value>spark</value>
</property>
6、根据dwd_base_event_log,创建dwd层商品点击表、点赞表、收藏表等
--dwd层的商品点击表
CREATE EXTERNAL TABLE dwd_display_log(
mid_id
string,
user_id
string,
version_code
string,
version_name
string,
lang
string,
source
string,
os
string,
area
string,
model
string,
brand
string,
sdk_version
string,
gmail
string,
height_width
string,
app_time
string,
network
string,
lng
string,
lat
string,
action
string,
goodsid
string,
place
string,
extend1
string,
category
string,
server_time
string
)PARTITIONED BY (dt string);
insert overwrite table dwd_display_log partition(dt='2022-04-01')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action'),
get_json_object(event_json,'$.kv.goodsid'),
get_json_object(event_json,'$.kv.place'),
get_json_object(event_json,'$.kv.extend1'),
get_json_object(event_json,'$.kv.category'),
server_time
from dwd_base_event_log where dt='2022-04-01'
and event_name='display'
--dwd层的点赞表
CREATE EXTERNAL TABLE dwd_praise_log(
mid_id
string,
user_id
string,
version_code
string,
version_name
string,
lang
string,
source
string,
os
string,
area
string,
model
string,
brand
string,
sdk_version
string,
gmail
string,
height_width
string,
app_time
string,
network
string,
lng
string,
lat
string,
id
string,
userid
string,
target_id
string,
type
string,
add_time
string,
server_time
string
)PARTITIONED BY (dt string);
insert overwrite table dwd_praise_log PARTITION (dt='2022-04-01')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.target_id') target_id,
get_json_object(event_json,'$.kv.type') type,
get_json_object(event_json,'$.kv.add_time') add_time,
server_time from dwd_base_event_log
where dt='2022-04-01' and event_name='praise';
--dwd层的收藏表
CREATE EXTERNAL TABLE dwd_favorites_log(
mid_id
string,
user_id
string,
version_code
string,
version_name
string,
lang
string,
source
string,
os
string,
area
string,
model
string,
brand
string,
sdk_version
string,
gmail
string,
height_width
string,
app_time
string,
network
string,
lng
string,
lat
string,
id
int,
course_id
int,
userid
int,
add_time
string,
server_time
string
)PARTITIONED BY (dt string);
insert overwrite table dwd_favorites_log PARTITION (dt='2022-04-01')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id,
get_json_object(event_json,'$.kv.course_id') course_id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.add_time') add_time,
server_time from dwd_base_event_log
where dt='2022-04-01' and event_name='favorites';
select * from dwd_praise_log where dt='2022-04-01'
Original: https://blog.csdn.net/weixin_45887858/article/details/124535166
Author: 宋荣子_
Title: 电商离线数仓(一)
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/818346/
转载文章受原作者版权保护。转载请注明原作者出处!