Skywalking-09:OAL原理——如何通过动态生成的Class类保存数据

OAL 如何通过动态生成的 Class 类,保存数据

前置工作

OAL 如何将动态生成的 SourceDispatcher 添加到 DispatcherManager

    // org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService#load
    public void load(OALDefine define) throws ModuleStartException {
        if (oalDefineSet.contains(define)) {
            // each oal define will only be activated once
            return;
        }
        try {
            OALEngine engine = loadOALEngine(define);
            // 设置Stream注解监听器,用来处理org.apache.skywalking.oap.server.core.analysis.Stream注解
            StreamAnnotationListener streamAnnotationListener = new StreamAnnotationListener(moduleManager);
            engine.setStreamListener(streamAnnotationListener);

            // org.apache.skywalking.oap.server.core.source.SourceReceiverImpl#getDispatcherDetectorListener
            // 获取的就是org.apache.skywalking.oap.server.core.analysis.DispatcherManager对象
            engine.setDispatcherListener(moduleManager.find(CoreModule.NAME)
                                                      .provider()
                                                      .getService(SourceReceiver.class)
                                                      .getDispatcherDetectorListener());

            // 调用的就是 org.apache.skywalking.oal.rt.OALRuntime#start
            engine.start(OALEngineLoaderService.class.getClassLoader());

            // 通知所有的监听器
            engine.notifyAllListeners();

            oalDefineSet.add(define);
        } catch (ReflectiveOperationException | OALCompileException e) {
            throw new ModuleStartException(e.getMessage(), e);
        }
    }

org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService#load 方法做了如下操作:

  1. 设置 Stream 注解监听器,用来获取指标类的基本信息,并进行相应处理
@Stream(
    name = "instance_jvm_class_loaded_class_count",
    scopeId = 11000,
    builder = InstanceJvmClassLoadedClassCountMetricsBuilder.class,
    processor = MetricsStreamProcessor.class
)
public class InstanceJvmClassLoadedClassCountMetrics extends LongAvgMetrics implements WithMetadata {
    // 省略
}
  1. 通过模块管理器,先获取到 SourceReceiver 对象,借由此对象获取到 DispatcherManager 对象
public class SourceReceiverImpl implements SourceReceiver {
    @Getter
    private final DispatcherManager dispatcherManager;

    @Override
    public DispatcherDetectorListener getDispatcherDetectorListener() {
        return getDispatcherManager();
    }
}
  1. 启动 OAL 引擎
  2. 通知所有的监听器

org.apache.skywalking.oal.rt.OALRuntime#notifyAllListeners

    @Override
    public void notifyAllListeners() throws ModuleStartException {
        for (Class metricsClass : metricsClasses) {
            try {
                // 将动态生成的Metrics添加到MetricsStreamProcessor
                streamAnnotationListener.notify(metricsClass);
            } catch (StorageException e) {
                throw new ModuleStartException(e.getMessage(), e);
            }
        }
        for (Class dispatcherClass : dispatcherClasses) {
            try {
                // 添加动态生成的SourceDispatch至DispatcherManager
                dispatcherDetectorListener.addIfAsSourceDispatcher(dispatcherClass);
            } catch (Exception e) {
                throw new ModuleStartException(e.getMessage(), e);
            }
        }
    }

org.apache.skywalking.oap.server.core.analysis.DispatcherManager#addIfAsSourceDispatcher

    @Override
    public void addIfAsSourceDispatcher(Class aClass) throws IllegalAccessException, InstantiationException {
        if (!aClass.isInterface() && !Modifier.isAbstract(
            aClass.getModifiers()) && SourceDispatcher.class.isAssignableFrom(aClass)) {
            Type[] genericInterfaces = aClass.getGenericInterfaces();
            for (Type genericInterface : genericInterfaces) {
                ParameterizedType anInterface = (ParameterizedType) genericInterface;
                if (anInterface.getRawType().getTypeName().equals(SourceDispatcher.class.getName())) {
                    Type[] arguments = anInterface.getActualTypeArguments();

                    if (arguments.length != 1) {
                        throw new UnexpectedException("unexpected type argument number, class " + aClass.getName());
                    }
                    Type argument = arguments[0];

                    Object source = ((Class) argument).newInstance();

                    if (!Source.class.isAssignableFrom(source.getClass())) {
                        throw new UnexpectedException(
                            "unexpected type argument of class " + aClass.getName() + ", should be org.apache.skywalking.oap.server.core.source.Source. ");
                    }

                    Source dispatcherSource = (Source) source;
                    SourceDispatcher dispatcher = (SourceDispatcher) aClass.newInstance();

                    int scopeId = dispatcherSource.scope();

                    // 使用scope做SourceDispatcher Map的key
                    List dispatchers = this.dispatcherMap.get(scopeId);
                    if (dispatchers == null) {
                        dispatchers = new ArrayList<>();
                        this.dispatcherMap.put(scopeId, dispatchers);
                    }
                    // 添加
                    dispatchers.add(dispatcher);

                    LOGGER.info("Dispatcher {} is added into DefaultScopeDefine {}.", dispatcher.getClass()
                                                                                                .getName(), scopeId);
                }
            }
        }
    }

OAL 如何将动态生成的 Metrics 添加到 MetricsStreamProcessor

与” OAL 如何将动态生成的 SourceDispatcher 添加到 DispatcherManager “流程基本一致,都是在 org.apache.skywalking.oal.rt.OALRuntime#notifyAllListeners 方法中处理的

    @Override
    public void notifyAllListeners() throws ModuleStartException {
        for (Class metricsClass : metricsClasses) {
            try {
                // 将动态生成的Metrics添加到MetricsStreamProcessor
                streamAnnotationListener.notify(metricsClass);
            } catch (StorageException e) {
                throw new ModuleStartException(e.getMessage(), e);
            }
        }
        for (Class dispatcherClass : dispatcherClasses) {
            try {
                // 添加动态生成的SourceDispatch至DispatcherManager
                dispatcherDetectorListener.addIfAsSourceDispatcher(dispatcherClass);
            } catch (Exception e) {
                throw new ModuleStartException(e.getMessage(), e);
            }
        }
    }

org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener#notify

    @Override
    public void notify(Class aClass) throws StorageException {
        if (aClass.isAnnotationPresent(Stream.class)) {
            Stream stream = (Stream) aClass.getAnnotation(Stream.class);

            if (stream.processor().equals(RecordStreamProcessor.class)) {
                RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else if (stream.processor().equals(MetricsStreamProcessor.class)) {
                // 因为所有的Metrics类上的@Stream注解的processor = MetricsStreamProcessor.class,所以只会走该分支
                MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else if (stream.processor().equals(TopNStreamProcessor.class)) {
                TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else if (stream.processor().equals(NoneStreamProcessor.class)) {
                NoneStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else if (stream.processor().equals(ManagementStreamProcessor.class)) {
                ManagementStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else {
                throw new UnexpectedException("Unknown stream processor.");
            }
        } else {
            throw new UnexpectedException(
                    "Stream annotation listener could only parse the class present stream annotation.");
        }
    }

org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor#create 中,通过一系列的处理,最后将 Worker (处理器)放入 map 中,等待后续被使用

    /**
     * Create the workers and work flow for every metrics.

     *
     * @param moduleDefineHolder pointer of the module define.

     * @param stream             definition of the metrics class.

     * @param metricsClass       data type of the streaming calculation.

     */
    public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class metricsClass) throws StorageException {
        this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass);
    }

    @SuppressWarnings("unchecked")
    public void create(ModuleDefineHolder moduleDefineHolder,
                       StreamDefinition stream,
                       Class metricsClass) throws StorageException {
        if (DisableRegister.INSTANCE.include(stream.getName())) {
            return;
        }

        StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
        IMetricsDAO metricsDAO;
        try {
            // 获取@Stream注解上的builder类,并创建Metrics存储DAO对象
            metricsDAO = storageDAO.newMetricsDao(stream.getBuilder().newInstance());
        } catch (InstantiationException | IllegalAccessException e) {
            throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e);
        }

        ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
        DownSamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME)
                                                                    .provider()
                                                                    .getService(DownSamplingConfigService.class);

        MetricsPersistentWorker hourPersistentWorker = null;
        MetricsPersistentWorker dayPersistentWorker = null;

        MetricsTransWorker transWorker = null;

        final MetricsExtension metricsExtension = metricsClass.getAnnotation(MetricsExtension.class);
        /**
         * All metrics default are supportDownSampling and insertAndUpdate, unless it has explicit definition.

         */
        boolean supportDownSampling = true;
        boolean supportUpdate = true;
        if (metricsExtension != null) {
            supportDownSampling = metricsExtension.supportDownSampling();
            supportUpdate = metricsExtension.supportUpdate();
        }
        if (supportDownSampling) {
            if (configService.shouldToHour()) {
                Model model = modelSetter.add(
                    metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Hour), false);
                hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
            }
            if (configService.shouldToDay()) {
                Model model = modelSetter.add(
                    metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Day), false);
                dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
            }

            transWorker = new MetricsTransWorker(
                moduleDefineHolder, hourPersistentWorker, dayPersistentWorker);
        }

        Model model = modelSetter.add(
            metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Minute), false);
        MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(
            moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate);

        String remoteReceiverWorkerName = stream.getName() + "_rec";
        IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME)
                                                                       .provider()
                                                                       .getService(IWorkerInstanceSetter.class);
        workerInstanceSetter.put(remoteReceiverWorkerName, minutePersistentWorker, metricsClass);

        MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
        MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
            moduleDefineHolder, remoteWorker, stream.getName());

        // private Map, MetricsAggregateWorker> entryWorkers = new HashMap<>();
        // 将指标类的Class与MetricsAggregateWorker放入map中
        // 当需要处理指标数据时,从map中获取即可
        entryWorkers.put(metricsClass, aggregateWorker);
    }

SourceReceiver 处理 Source 相关流程

在”从一个案例开始分析 OAL 原理”一节,聊到了 oap server 将从 agent 收到的指标信息,发送至 SourceReceive
对应的坐标是: org.apache.skywalking.oap.server.analyzer.provider.jvm.JVMSourceDispatcher#sendToClassMetricProcess

    private void sendToClassMetricProcess(String service,
            String serviceId,
            String serviceInstance,
            String serviceInstanceId,
            long timeBucket,
            Class clazz) {
        // 拼装Source对象
        ServiceInstanceJVMClass serviceInstanceJVMClass = new ServiceInstanceJVMClass();
        serviceInstanceJVMClass.setId(serviceInstanceId);
        serviceInstanceJVMClass.setName(serviceInstance);
        serviceInstanceJVMClass.setServiceId(serviceId);
        serviceInstanceJVMClass.setServiceName(service);
        serviceInstanceJVMClass.setLoadedClassCount(clazz.getLoadedClassCount());
        serviceInstanceJVMClass.setUnloadedClassCount(clazz.getUnloadedClassCount());
        serviceInstanceJVMClass.setTotalLoadedClassCount(clazz.getTotalLoadedClassCount());
        serviceInstanceJVMClass.setTimeBucket(timeBucket);
        // 将Source对象发送至SourceReceive进行处理
        sourceReceiver.receive(serviceInstanceJVMClass);
    }

SourceReceiver 的默认实现类 org.apache.skywalking.oap.server.core.source.SourceReceiverImpl ,将收集到的指标通过 org.apache.skywalking.oap.server.core.analysis.DispatcherManager#forward 进行分发

package org.apache.skywalking.oap.server.core.source;

import java.io.IOException;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;

public class SourceReceiverImpl implements SourceReceiver {
    @Getter
    private final DispatcherManager dispatcherManager;

    public SourceReceiverImpl() {
        this.dispatcherManager = new DispatcherManager();
    }

    @Override
    public void receive(Source source) {
        // 通过调配器管理器进行转发
        dispatcherManager.forward(source);
    }

    @Override
    public DispatcherDetectorListener getDispatcherDetectorListener() {
        return getDispatcherManager();
    }

    public void scan() throws IOException, InstantiationException, IllegalAccessException {
        dispatcherManager.scan();
    }
}
    // org.apache.skywalking.oap.server.core.analysis.DispatcherManager#forward
    public void forward(Source source) {
        if (source == null) {
            return;
        }
        // 通过source的scope找到对应的调度器
        List dispatchers = dispatcherMap.get(source.scope());

        /**
         * Dispatcher is only generated by oal script analysis result.

         * So these will/could be possible, the given source doesn't have the dispatcher,
         * when the receiver is open, and oal script doesn't ask for analysis.

         */
        if (dispatchers != null) {
            source.prepare();
            // 调度器进行分发,OAL动态生成的调度器,也会在这进行分发
            for (SourceDispatcher dispatcher : dispatchers) {
                dispatcher.dispatch(source);
            }
        }
    }

MetricsStreamProcessor 如何处理 SourceDispatcher 发送过来的指标数据

完整代码请见” OAL 如何动态生成 Class 类”下”案例”一节

org.apache.skywalking.oap.server.core.source.oal.rt.dispatcher.ServiceInstanceJVMClassDispatcher#doInstanceJvmClassLoadedClassCount 发送数据至 org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor

package org.apache.skywalking.oap.server.core.source.oal.rt.dispatcher;

import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.source.ServiceInstanceJVMClass;
import org.apache.skywalking.oap.server.core.source.Source;
import org.apache.skywalking.oap.server.core.source.oal.rt.metrics.InstanceJvmClassLoadedClassCountMetrics;

public class ServiceInstanceJVMClassDispatcher implements SourceDispatcher {
    private void doInstanceJvmClassLoadedClassCount(ServiceInstanceJVMClass var1) {
        InstanceJvmClassLoadedClassCountMetrics var2 = new InstanceJvmClassLoadedClassCountMetrics();
        var2.setTimeBucket(var1.getTimeBucket());
        var2.setEntityId(var1.getEntityId());
        var2.setServiceId(var1.getServiceId());
        var2.combine(var1.getLoadedClassCount(), (long)1);
        // 发送数据到指标流处理器
        MetricsStreamProcessor.getInstance().in(var2);
    }

    public void dispatch(Source var1) {
        ServiceInstanceJVMClass var2 = (ServiceInstanceJVMClass)var1;
        this.doInstanceJvmClassLoadedClassCount(var2);
    }

    public ServiceInstanceJVMClassDispatcher() {
    }
}

org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor#in 方法中,使用在 org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor#create 中创建的 Worker 对象,保存数据

    public void in(Metrics metrics) {
        MetricsAggregateWorker worker = entryWorkers.get(metrics.getClass());
        if (worker != null) {
            worker.in(metrics);
        }
    }

PS:内部再细节一些的数据处理流程,相关的关键字有: DataCarrierWorkerStorageModule ,暂且不表,不是这篇文章的内容。

总结

Skywalking Metrics处理流程

Skywalking-09:OAL原理——如何通过动态生成的Class类保存数据

参考文档

分享并记录所学所见

Original: https://www.cnblogs.com/switchvov/p/15361540.html
Author: switchvov
Title: Skywalking-09:OAL原理——如何通过动态生成的Class类保存数据

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/572866/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 一起来学习Android自定义控件1

    概述 Android已经为我们提供了大量的View供我们使用,但是可能有时候这些组件不能满足我们的需求,这时候就需要自定义控件了。自定义控件对于初学者总是感觉是一种复杂的技术。因为…

    Java 2023年6月13日
    0183
  • 模拟tomcat服务器,sun公司,webapp开发者

    模拟tomcat服务器,sun公司,webapp开发者 首先我们思考一下一个动态web应用需要哪些角色参与,角色与角色之间又有多少协议? 1.有4种角色,分别是(浏览器开发团队[如…

    Java 2023年6月5日
    077
  • Day13

    package com.oop.demo05;//在java中所有的类都直接或者间接默认继承object//人 父类public class Person { //public 公…

    Java 2023年6月5日
    0111
  • 概率神经网络 (PNN) 应用的简单DEMO

    概率神经网络的全称是Probabilistic neural network,它主要用于模式分类,是基于贝叶斯策略前馈神经网络。它有着坚实的数学理论基础,当然本文并不打算从数学符号…

    Java 2023年6月15日
    091
  • 1

    1 posted @2022-09-25 23:44 山海自有归期 阅读(21 ) 评论() 编辑 Original: https://www.cnblogs.com/ljq202…

    Java 2023年6月7日
    085
  • NGINX转发端口后却跳转到80端口的解决方法

    问题原因: nginx没有正确的把端口信息传送到后端,没能正确的配置nginx,下面这行是关键 proxy_set_header Host $host:$server_port; …

    Java 2023年5月30日
    071
  • vim编辑器命令

    参考: https://blog.csdn.net/weixin_44191814/article/details/120091363 vim编辑器 ## Vim基本模式 【对文件…

    Java 2023年6月6日
    0118
  • Centos部署Loki日志聚合系统

    关于一些日志聚合由来及原理科普见我的另外一篇 《编程入门之日志聚合系统》 https://www.cnblogs.com/uncleguo/p/15948763.html Loki…

    Java 2023年6月15日
    083
  • Redis分布式锁实现Redisson 15问

    大家好,我是三友。 在一个分布式系统中,由于涉及到多个实例同时对同一个资源加锁的问题,像传统的synchronized、ReentrantLock等单进程情况加锁的api就不再适用…

    Java 2023年6月16日
    076
  • RabbitMQ——消息确认机制(手动/自动)

    非常好理解的推文 报错: Only one ConfirmCallback is supported by each RabbitTemplate 解决方法: Original: …

    Java 2023年5月30日
    094
  • eventBus for Flutter & Dart

    Eventbus框架,适用于Flutter,Dart项目 Architecture Pub Usage with 3 step Using Vip Event Sometimes …

    Java 2023年5月29日
    062
  • Springcloud学习笔记45–Java中获取当前服务器的Ip地址

    获取本地的Ip地址: InetAddress.getLocalHost().getHostAddress()在windows下没问题,在linux下是根据主机名在hosts文件对应…

    Java 2023年5月29日
    085
  • c#反射

    待总结 posted @2015-03-19 12:58 zhepama 阅读(134 ) 评论() 编辑 Original: https://www.cnblogs.com/zh…

    Java 2023年5月30日
    0101
  • 操作系统(一)—— 进程同步

    下一篇章打算讲解 AQS ,在去熟悉 AQS 原理之前,我想我们得知道一个事物是因何而产生,它出现的动机;AQS 作为一种高级的同步机制,讨论 AQS 则避免不了谈及操作系统以及 …

    Java 2023年6月9日
    081
  • 基于Python来识别处理照片里的条形码

    最近一直在玩数独,突发奇想实现图像识别求解数独,输入到输出平均需要0.5s。 整体思路大概就是识别出图中数字生成list,然后求解。 输入输出demo 数独采用的是微软自带的Mic…

    Java 2023年6月7日
    082
  • 从Spring中学到的【1】–读懂继承链

    最近看了一些 Spring 源码,发现源码分析的文章很多,而底层思想分析的文章比较少,这个系列文章准备总结一下Spring中给我的启示,包括设计模式思想、SOLID设计原则等,涉及…

    Java 2023年6月16日
    099
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球