python实现skywalking的trace模块过滤和报警

skywalking本身的报警功能,用起来视乎不是特别好用,目前想实现对skywalking的trace中的错误接口进行过滤并报警通知管理员和开发。所以自己就用python对skywalking做了二次数据清洗实现。项目方在了自己了github(https://github.com/shygit-dev/skywalking-cli-python)上了,有兴趣的同学可以做二次改造,共同学习。下面简单列出了代码内容:

#!/usr/bin/env python# _*_ coding: utf-8 _*_# Tile:# Author:shyimport requestsimport timeimport smtplibfrom email.mime.text import MIMETextimport redef interface_content_filter(trace_id):    '''    对详细日志内容(业务逻辑报错)进行过滤    :param trace_id:    :return: 【1|0】    '''    url = "http://172.16.53.232:50001/query"    params = {        "trace_id": trace_id    }    detail_trace_id_log = requests.request(method="GET",url=url,params=params)    detail_trace_id_log = detail_trace_id_log.text    print(detail_trace_id_log)    print(type(detail_trace_id_log))    with open("blackname_keyword_list","r",encoding="utf-8") as f:        for line in f:            print(line)            result = re.search(line.strip(),detail_trace_id_log)            print(result)            if result != None:                print("哥们匹配到日志黑名单关键字了:%s" % line)                return 0    print("提示:%s不在关键字黑名单中" % trace_id)    return 1def interface_filter(endpointName):    """    设置接口黑名单    :param endpointName:    :return: 【1|0】    """    endpointName = re.sub("\(|\)",".",endpointName)    with open("blackname_list","r",encoding="utf-8") as f:        bn_list = f.read()    match_result = re.search(endpointName.strip(),bn_list)    if match_result == None:        print("提示:接口不存在黑名单中")        return 1    print("提示:接口在黑名单中")    return 0def trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr):    """    skywalking trace功能对错误接口进行过滤,默认最大一次获取2000条数据,每分钟执行一次    :param start_time:    :param end_time:    :return:    """    url = sw_url    data = {          "query": "query queryTraces($condition: TraceQueryCondition) {\n  data: queryBasicTraces(condition: $condition) {\n    traces {\n      key: segmentId\n      endpointNames\n      duration\n      start\n      isError\n      traceIds\n    }\n    total\n  }}",          "variables": {            "condition": {              "queryDuration": {                "start": start_time, #"2021-12-07 1734"                "end": end_time,                "step": "MINUTE"              },              "traceState": "ERROR",              "paging": {                "pageNum": 1,                "pageSize": per_page_size,                "needTotal": "true"              },              "queryOrder": "BY_START_TIME"              # "traceId": "b669d0069be84fce82261901de412e7c.430.16388637511348105"            }          }        }    result = requests.request(method="post",url=url,json=data)    i = 0    # print(result.content)    # print(time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:])))))    with open("mail.html","w",encoding="utf-8") as f:        f.write('Title.t {border-right: 2px solid black;border-bottom: 2px solid black;}.t th,td {border-top: 2px solid black;border-left: 2px solid black;font-size: 10px;}最近15分钟统计:时间持续时长接口名称追踪ID')    for trace in result.json()["data"]["data"]["traces"]:        # print(trace["endpointNames"])        print("时间:%s\n" % time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:])))),              "持续时长:%s\n" % trace["duration"],              "接口名称:%s\n" % trace["endpointNames"][0],              "跟踪ID:%s" % trace["traceIds"][0])        # print(time.localtime(1638869640.194))        i+=1        print(i)        s_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:]))))        dur_time =  trace["duration"]        endpointName = trace["endpointNames"][0]        trace_id = trace["traceIds"][0]        # 调用接口黑名单过滤功能        result = interface_filter(endpointName)        if result == 0:            print("哥们进入黑名单了!",endpointName)            continue        # 调用关键字黑名单过滤功能        keyword_result = interface_content_filter(trace_id)        if keyword_result == 0:            print("哥们进入关键字黑名单了!", trace_id)            continue        with open("mail.html","a",encoding="utf-8") as f:            f.write('%s%s%s%s' %(s_time,dur_time,endpointName,trace_detail_addr,trace_id,trace_id))    with open("mail.html","a",encoding="utf-8") as f:        f.write('')def send_mail(receiver):    """    发送报错接口邮件    :return:    """    server = "mail.test.com"    sender = "sa@test.com"    sender_pwd = "1qaz@WSX"    send_addr = "sa@test.com"    receiver = receiver    with open("mail.html","r",encoding="utf-8") as f:        content = f.read()    if re.search("",content) == None:        print("无报错接口!",content)        return 0    print("邮件前",content)    msg_mail = MIMEText(content,"html","utf-8")    msg_mail["Subject"] = "Skywalking报错接口统计"    msg_mail["From"] = sender    msg_mail["To"] = receiver    server_obj = smtplib.SMTP_SSL(server)    server_obj.connect(server,465)    server_obj.login(sender,sender_pwd)    server_obj.sendmail(send_addr,receiver,msg_mail.as_string())if __name__ == "__main__":    # 设定查询时间间隔,默认900s(15min)    end_time = time.time()    start_time = end_time - 900    start_time=time.strftime("%Y-%m-%d %H%M",time.localtime(start_time))    end_time = time.strftime("%Y-%m-%d %H%M", time.localtime(end_time))    print(start_time)    print(end_time)    sw_url = "http://172.16.53.232:9412/graphql" # skywalking的前端服务的地址和端口    per_page_size = 5000  #指定一次获取endpoint接口的数目    trace_detail_addr = "127.0.0.1:5000" #指定查询指定trace_id详细日志    receiver = "shy@test.com"  #报警邮件接收人地址    trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr)    send_mail(receiver)    # interface_filter()    # interface_content_filter("3c4212dd2dd548d394ba312c4619405d.104.16390380592724487")
#!/usr/bin/env python# _*_ coding: utf-8 _*_# Tile:# Author:shyimport requestsimport timefrom flask import Flask,requestapp = Flask(__name__)@app.route("/query",methods=["get"])def trace_id_query():    """    查询指定trace_id详细日志信息    :return: f.read()    """    trace_id = request.args.get("trace_id")    url="http://172.16.53.232:9412/graphql"    # url="http://skywalking.roulw.com/graphql"    data = {      "query": "query queryTrace($traceId: ID!) {\n  trace: queryTrace(traceId: $traceId) {\n    spans {\n      traceId\n      segmentId\n      spanId\n      parentSpanId\n      refs {\n        traceId\n        parentSegmentId\n        parentSpanId\n        type\n      }\n      serviceCode\n      serviceInstanceName\n      startTime\n      endTime\n      endpointName\n      type\n      peer\n      component\n      isError\n      layer\n      tags {\n        key\n        value\n      }\n      logs {\n        time\n        data {\n          key\n          value\n        }\n      }\n    }\n  }\n  }",      "variables": {        "traceId": trace_id      }    }    result = requests.request(method="post",url=url,json=data)    with open("detail_log", "w", encoding="utf-8") as f:        f.write("生产Skywalking报错接口跟踪日志日志:")    for trace_id in result.json()["data"]["trace"]["spans"]:        if trace_id["isError"]:            # print(trace_id)            print("服务名称:%s\n" % trace_id["serviceCode"],                  "开始时间:%s\n" % trace_id["startTime"],                  "接口名称:%s\n" % trace_id["endpointName"],                  "peer名称:%s\n" % trace_id["peer"],                  "tags名称:%s\n" % trace_id["tags"],                  "详细日志:%s" % trace_id["logs"])            content = "服务名称:%s开始时间:%s接口名称:%speer名称:%stags名称:%s" % (trace_id["serviceCode"],trace_id["startTime"],trace_id["endpointName"],trace_id["peer"],trace_id["tags"])            with open("detail_log","a",encoding="utf-8") as f:                f.write(content)                f.write("********详细日志**********")            for logs in trace_id["logs"]:                for log in logs["data"]:                    if log["key"] == "message":                        print(log["value"])                        with open("detail_log","a",encoding="utf-8") as f:                            f.write(log["value"])                        # return log["value"]                    elif log["key"] == "stack":                        print(log["value"])                        with open("detail_log","a",encoding="utf-8") as f:                            f.write(log["value"])            with open("detail_log", "a", encoding="utf-8") as f:                f.write("========下一个接口信息=========")    with open("detail_log","r",encoding="utf-8") as f:        return f.read()if __name__ == "__main__":    # trace_id = "14447ae7199c40a2b9862411daba180b.2142.16388920322367785"    # trace_id_query(trace_id)    app.run()

Original: https://www.cnblogs.com/shy01/p/15672623.html
Author: Shy01
Title: python实现skywalking的trace模块过滤和报警

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/510572/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球