Skywalking-09:OAL原理——如何通过动态生成的Class类保存数据
OAL
如何通过动态生成的 Class
类,保存数据
前置工作
OAL
如何将动态生成的 SourceDispatcher
添加到 DispatcherManager
// org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService#load public void load(OALDefine define) throws ModuleStartException { if (oalDefineSet.contains(define)) { // each oal define will only be activated once return; } try { OALEngine engine = loadOALEngine(define); // 设置Stream注解监听器,用来处理org.apache.skywalking.oap.server.core.analysis.Stream注解 StreamAnnotationListener streamAnnotationListener = new StreamAnnotationListener(moduleManager); engine.setStreamListener(streamAnnotationListener); // org.apache.skywalking.oap.server.core.source.SourceReceiverImpl#getDispatcherDetectorListener // 获取的就是org.apache.skywalking.oap.server.core.analysis.DispatcherManager对象 engine.setDispatcherListener(moduleManager.find(CoreModule.NAME) .provider() .getService(SourceReceiver.class) .getDispatcherDetectorListener()); // 调用的就是 org.apache.skywalking.oal.rt.OALRuntime#start engine.start(OALEngineLoaderService.class.getClassLoader()); // 通知所有的监听器 engine.notifyAllListeners(); oalDefineSet.add(define); } catch (ReflectiveOperationException | OALCompileException e) { throw new ModuleStartException(e.getMessage(), e); } }
在 org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService#load
方法做了如下操作:
设置
Stream
注解监听器,用来获取指标类的基本信息,并进行相应处理
@Stream( name = "instance_jvm_class_loaded_class_count", scopeId = 11000, builder = InstanceJvmClassLoadedClassCountMetricsBuilder.class, processor = MetricsStreamProcessor.class )public class InstanceJvmClassLoadedClassCountMetrics extends LongAvgMetrics implements WithMetadata { // 省略}
通过模块管理器,先获取到
SourceReceiver
对象,借由此对象获取到DispatcherManager
对象
public class SourceReceiverImpl implements SourceReceiver { @Getter private final DispatcherManager dispatcherManager; @Override public DispatcherDetectorListener getDispatcherDetectorListener() { return getDispatcherManager(); } }
启动
OAL
引擎通知所有的监听器
org.apache.skywalking.oal.rt.OALRuntime#notifyAllListeners
@Override public void notifyAllListeners() throws ModuleStartException { for (Class metricsClass : metricsClasses) { try { // 将动态生成的Metrics添加到MetricsStreamProcessor streamAnnotationListener.notify(metricsClass); } catch (StorageException e) { throw new ModuleStartException(e.getMessage(), e); } } for (Class dispatcherClass : dispatcherClasses) { try { // 添加动态生成的SourceDispatch至DispatcherManager dispatcherDetectorListener.addIfAsSourceDispatcher(dispatcherClass); } catch (Exception e) { throw new ModuleStartException(e.getMessage(), e); } } }
org.apache.skywalking.oap.server.core.analysis.DispatcherManager#addIfAsSourceDispatcher
@Override public void addIfAsSourceDispatcher(Class aClass) throws IllegalAccessException, InstantiationException { if (!aClass.isInterface() && !Modifier.isAbstract( aClass.getModifiers()) && SourceDispatcher.class.isAssignableFrom(aClass)) { Type[] genericInterfaces = aClass.getGenericInterfaces(); for (Type genericInterface : genericInterfaces) { ParameterizedType anInterface = (ParameterizedType) genericInterface; if (anInterface.getRawType().getTypeName().equals(SourceDispatcher.class.getName())) { Type[] arguments = anInterface.getActualTypeArguments(); if (arguments.length != 1) { throw new UnexpectedException("unexpected type argument number, class " + aClass.getName()); } Type argument = arguments[0]; Object source = ((Class) argument).newInstance(); if (!Source.class.isAssignableFrom(source.getClass())) { throw new UnexpectedException( "unexpected type argument of class " + aClass.getName() + ", should be `org.apache.skywalking.oap.server.core.source.Source`. "); } Source dispatcherSource = (Source) source; SourceDispatcher dispatcher = (SourceDispatcher) aClass.newInstance(); int scopeId = dispatcherSource.scope(); // 使用scope做SourceDispatcher Map的key List<SourceDispatcher> dispatchers = this.dispatcherMap.get(scopeId); if (dispatchers == null) { dispatchers = new ArrayList<>(); this.dispatcherMap.put(scopeId, dispatchers); } // 添加 dispatchers.add(dispatcher); LOGGER.info("Dispatcher {} is added into DefaultScopeDefine {}.", dispatcher.getClass() .getName(), scopeId); } } } }
OAL
如何将动态生成的 Metrics
添加到 MetricsStreamProcessor
与“ OAL
如何将动态生成的 SourceDispatcher
添加到 DispatcherManager
”流程基本一致,都是在 org.apache.skywalking.oal.rt.OALRuntime#notifyAllListeners
方法中处理的
@Override public void notifyAllListeners() throws ModuleStartException { for (Class metricsClass : metricsClasses) { try { // 将动态生成的Metrics添加到MetricsStreamProcessor streamAnnotationListener.notify(metricsClass); } catch (StorageException e) { throw new ModuleStartException(e.getMessage(), e); } } for (Class dispatcherClass : dispatcherClasses) { try { // 添加动态生成的SourceDispatch至DispatcherManager dispatcherDetectorListener.addIfAsSourceDispatcher(dispatcherClass); } catch (Exception e) { throw new ModuleStartException(e.getMessage(), e); } } }
org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener#notify
@Override public void notify(Class aClass) throws StorageException { if (aClass.isAnnotationPresent(Stream.class)) { Stream stream = (Stream) aClass.getAnnotation(Stream.class); if (stream.processor().equals(RecordStreamProcessor.class)) { RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass); } else if (stream.processor().equals(MetricsStreamProcessor.class)) { // 因为所有的Metrics类上的@Stream注解的processor = MetricsStreamProcessor.class,所以只会走该分支 MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass); } else if (stream.processor().equals(TopNStreamProcessor.class)) { TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass); } else if (stream.processor().equals(NoneStreamProcessor.class)) { NoneStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass); } else if (stream.processor().equals(ManagementStreamProcessor.class)) { ManagementStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass); } else { throw new UnexpectedException("Unknown stream processor."); } } else { throw new UnexpectedException( "Stream annotation listener could only parse the class present stream annotation."); } }
在 org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor#create
中,通过一系列的处理,最后将 Worker
(处理器)放入 map
中,等待后续被使用
/** * Create the workers and work flow for every metrics. * * @param moduleDefineHolder pointer of the module define. * @param stream definition of the metrics class. * @param metricsClass data type of the streaming calculation. */ public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Metrics> metricsClass) throws StorageException { this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass); } @SuppressWarnings("unchecked") public void create(ModuleDefineHolder moduleDefineHolder, StreamDefinition stream, Class<? extends Metrics> metricsClass) throws StorageException { if (DisableRegister.INSTANCE.include(stream.getName())) { return; } StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class); IMetricsDAO metricsDAO; try { // 获取@Stream注解上的builder类,并创建Metrics存储DAO对象 metricsDAO = storageDAO.newMetricsDao(stream.getBuilder().newInstance()); } catch (InstantiationException | IllegalAccessException e) { throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e); } ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class); DownSamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME) .provider() .getService(DownSamplingConfigService.class); MetricsPersistentWorker hourPersistentWorker = null; MetricsPersistentWorker dayPersistentWorker = null; MetricsTransWorker transWorker = null; final MetricsExtension metricsExtension = metricsClass.getAnnotation(MetricsExtension.class); /** * All metrics default are `supportDownSampling` and `insertAndUpdate`, unless it has explicit definition. */ boolean supportDownSampling = true; boolean supportUpdate = true; if (metricsExtension != null) { supportDownSampling = metricsExtension.supportDownSampling(); supportUpdate = metricsExtension.supportUpdate(); } if (supportDownSampling) { if (configService.shouldToHour()) { Model model = modelSetter.add( metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Hour), false); hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate); } if (configService.shouldToDay()) { Model model = modelSetter.add( metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Day), false); dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate); } transWorker = new MetricsTransWorker( moduleDefineHolder, hourPersistentWorker, dayPersistentWorker); } Model model = modelSetter.add( metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Minute), false); MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker( moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate); String remoteReceiverWorkerName = stream.getName() + "_rec"; IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME) .provider() .getService(IWorkerInstanceSetter.class); workerInstanceSetter.put(remoteReceiverWorkerName, minutePersistentWorker, metricsClass); MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName); MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker( moduleDefineHolder, remoteWorker, stream.getName()); // private Map<Class<? extends Metrics>, MetricsAggregateWorker> entryWorkers = new HashMap<>(); // 将指标类的Class与MetricsAggregateWorker放入map中 // 当需要处理指标数据时,从map中获取即可 entryWorkers.put(metricsClass, aggregateWorker); }
SourceReceiver
处理 Source
相关流程
在“从一个案例开始分析 OAL
原理”一节,聊到了 oap server
将从 agent
收到的指标信息,发送至 SourceReceive
中
对应的坐标是:org.apache.skywalking.oap.server.analyzer.provider.jvm.JVMSourceDispatcher#sendToClassMetricProcess
private void sendToClassMetricProcess(String service, String serviceId, String serviceInstance, String serviceInstanceId, long timeBucket, Class clazz) { // 拼装Source对象 ServiceInstanceJVMClass serviceInstanceJVMClass = new ServiceInstanceJVMClass(); serviceInstanceJVMClass.setId(serviceInstanceId); serviceInstanceJVMClass.setName(serviceInstance); serviceInstanceJVMClass.setServiceId(serviceId); serviceInstanceJVMClass.setServiceName(service); serviceInstanceJVMClass.setLoadedClassCount(clazz.getLoadedClassCount()); serviceInstanceJVMClass.setUnloadedClassCount(clazz.getUnloadedClassCount()); serviceInstanceJVMClass.setTotalLoadedClassCount(clazz.getTotalLoadedClassCount()); serviceInstanceJVMClass.setTimeBucket(timeBucket); // 将Source对象发送至SourceReceive进行处理 sourceReceiver.receive(serviceInstanceJVMClass); }
SourceReceiver
的默认实现类 org.apache.skywalking.oap.server.core.source.SourceReceiverImpl
,将收集到的指标通过 org.apache.skywalking.oap.server.core.analysis.DispatcherManager#forward
进行分发
package org.apache.skywalking.oap.server.core.source;import java.io.IOException;import lombok.Getter;import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;public class SourceReceiverImpl implements SourceReceiver { @Getter private final DispatcherManager dispatcherManager; public SourceReceiverImpl() { this.dispatcherManager = new DispatcherManager(); } @Override public void receive(Source source) { // 通过调配器管理器进行转发 dispatcherManager.forward(source); } @Override public DispatcherDetectorListener getDispatcherDetectorListener() { return getDispatcherManager(); } public void scan() throws IOException, InstantiationException, IllegalAccessException { dispatcherManager.scan(); } }
// org.apache.skywalking.oap.server.core.analysis.DispatcherManager#forward public void forward(Source source) { if (source == null) { return; } // 通过source的scope找到对应的调度器 List<SourceDispatcher> dispatchers = dispatcherMap.get(source.scope()); /** * Dispatcher is only generated by oal script analysis result. * So these will/could be possible, the given source doesn't have the dispatcher, * when the receiver is open, and oal script doesn't ask for analysis. */ if (dispatchers != null) { source.prepare(); // 调度器进行分发,OAL动态生成的调度器,也会在这进行分发 for (SourceDispatcher dispatcher : dispatchers) { dispatcher.dispatch(source); } } }
MetricsStreamProcessor
如何处理 SourceDispatcher
发送过来的指标数据
完整代码请见“ OAL
如何动态生成 Class
类”下“案例”一节
org.apache.skywalking.oap.server.core.source.oal.rt.dispatcher.ServiceInstanceJVMClassDispatcher#doInstanceJvmClassLoadedClassCount
发送数据至 org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor
package org.apache.skywalking.oap.server.core.source.oal.rt.dispatcher;import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;import org.apache.skywalking.oap.server.core.source.ServiceInstanceJVMClass;import org.apache.skywalking.oap.server.core.source.Source;import org.apache.skywalking.oap.server.core.source.oal.rt.metrics.InstanceJvmClassLoadedClassCountMetrics;public class ServiceInstanceJVMClassDispatcher implements SourceDispatcher<ServiceInstanceJVMClass> { private void doInstanceJvmClassLoadedClassCount(ServiceInstanceJVMClass var1) { InstanceJvmClassLoadedClassCountMetrics var2 = new InstanceJvmClassLoadedClassCountMetrics(); var2.setTimeBucket(var1.getTimeBucket()); var2.setEntityId(var1.getEntityId()); var2.setServiceId(var1.getServiceId()); var2.combine(var1.getLoadedClassCount(), (long)1); // 发送数据到指标流处理器 MetricsStreamProcessor.getInstance().in(var2); } public void dispatch(Source var1) { ServiceInstanceJVMClass var2 = (ServiceInstanceJVMClass)var1; this.doInstanceJvmClassLoadedClassCount(var2); } public ServiceInstanceJVMClassDispatcher() { } }
在org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor#in
方法中,使用在 org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor#create
中创建的 Worker
对象,保存数据
public void in(Metrics metrics) { MetricsAggregateWorker worker = entryWorkers.get(metrics.getClass()); if (worker != null) { worker.in(metrics); } }
PS:内部再细节一些的数据处理流程,相关的关键字有: DataCarrier
、 Worker
、 StorageModule
,暂且不表,不是这篇文章的内容。