3.3 Trino二次开发-动态数据源管理-源码解读3

目录

前言

一、Catalog创建过程中涉及的实体和类

二、updateConnectorIds

getTrinoAnnouncement(Set announcements)

ServiceAnnouncement

Announcer

DiscoveryAnnouncementClient

流程总结

总结


前言

上文中介绍了如何在Trino中添加Rest API接口,以及Trino加载到创建Catalog的过程。由于篇幅的原因没有对updateConnectorIds部分的源码进行调研。

本文中主要涵盖两部分的内容:

1)Trino Catalog创建过程中涉及到类进行梳理

2)updateConnectorIds调研


一、Catalog创建过程中涉及的类

1.Catalog创建流程如下图

2.过程中涉及到一些比较重要的类,写出来介绍一下

  • CatalogName: 定义3种不同类别的catalog,分别为: catalog本身,$info_schema@ 和 $system@。
  • Catalog: 包含CatalogName和Connector两类对象,每个类别中有三种:catalog本身,informationSchema,systemTables。
  • CatalogManager: 对Catalog进行管理的容器类。
  • Connector: SPI包下的接口,定义了连接数据源所需要的接口如索引,数据分页处理,数据结果获取等一系列的方法,为扩展不同类型的数据源提供标准。
  • MaterializedConnector: ConnectorManager的内部类,封装了连接器所需的内容。比如用于管理分片的ConnectorSplitManager、管理分页读写的ConnectorPageSourceProvider和ConnectorPageSinkProvider、管理索引的ConnectorIndexProvider、管理分片的ConnectorNodePartitioningProvider等等,以及表级别的各种PropertyMetadata属性等。
  • ConnectorFactory: SPI包下的接口,定义了创建连接的create,及获取处理器的getHandleResolver方法,为扩展不同类型的数据源提供标准。
  • ConnectorManager: 管理Connector的容器类,封装了很多与数据源交互的重要类,如MetadataManager,SplitManager,HandleResolver,TransactionManager,EventListenerManager等,也提供了createCatalog(),createConnector()等重要的方法。
  • HandleResolver: 维护catalog和其对应数据源处理器的容器类,与Springboot中管理url和其业务处理方法的容器类作用相似。
  • EventListenerManager: 管理Event的类。

二、updateConnectorIds

之前围绕injector.getInstance(StaticCatalogStore.class).loadCatalogs()这行代码进行了深入的调研,下面是第二块比较重要的代码updateConnectorIds。

// TODO: remove this huge hack
            updateConnectorIds(injector.getInstance(Announcer.class), injector.getInstance(CatalogManager.class));

看了Trino代码后,发现代码的注释真的很少,这行代码上的注释,说明updateConnectorIds会在未来的版本中被移除掉,不过现在我们还是来研究下。

private static void updateConnectorIds(Announcer announcer, CatalogManager metadata)
    {
        // get existing announcement
        ServiceAnnouncement announcement = getTrinoAnnouncement(announcer.getServiceAnnouncements());

        // automatically build connectorIds if not configured
        Set<String> connectorIds = metadata.getCatalogs().stream()
                .map(Catalog::getConnectorCatalogName)
                .map(Object::toString)
                .collect(toImmutableSet());

        // build announcement with updated sources
        ServiceAnnouncementBuilder builder = serviceAnnouncement(announcement.getType());
        builder.addProperties(announcement.getProperties());
        builder.addProperty("connectorIds", Joiner.on(',').join(connectorIds));

        // update announcement
        announcer.removeServiceAnnouncement(announcement.getId());
        announcer.addServiceAnnouncement(builder.build());
    }

可以看出此方法围绕ServiceAnnouncement的更新及重新添加到Announcer实例中,

进行了如下工作:

1)在Announcer实例中获取类型为trino的ServiceAnnouncement对象,具体调用getTrinoAnnouncement(Set<ServiceAnnouncement> announcements)

2)构建catalog名称的集合

3)用catalog的集合,构建新的ServiceAnnouncementBuilder对象

4)更新ServiceAnnouncement,重新添加到Announcer实例中

getTrinoAnnouncement(Set<ServiceAnnouncement> announcements)

这个方法很简单,从容器中获取类型为trino的ServiceAnnouncement。

private static ServiceAnnouncement getTrinoAnnouncement(Set<ServiceAnnouncement> announcements)
    {
        for (ServiceAnnouncement announcement : announcements) {
            if (announcement.getType().equals("trino")) {
                return announcement;
            }
        }
        throw new IllegalArgumentException("Trino announcement not found: " + announcements);
    }

ServiceAnnouncement

这里多次提到了ServiceAnnouncement,那么ServiceAnnouncement的作用是什么?

ServiceAnnouncement类位于airlift框架discovery.client包下,io.airlift.discovery用于服务的发现。

从源码来看,ServiceAnnouncement像是对服务进行声明的类或者说包装类。

public class ServiceAnnouncement
{
    private final UUID id = UUID.randomUUID();
    private final String type;
    private final Map<String, String> properties;

    private ServiceAnnouncement(String type, Map<String, String> properties)
    {
        requireNonNull(type, "type is null");
        requireNonNull(properties, "properties is null");

        this.type = type;
        this.properties = ImmutableMap.copyOf(properties);
    }
    ......
}

产生了疑问,将catalog声明成ServiceAnnouncement并添加到Announcer中的作用是什么呢?

Announcer

在执行完updateConnectorIds方法后,如下面代码所示,会执行Announcer的start方法

 // TODO: remove this huge hack
            updateConnectorIds(injector.getInstance(Announcer.class), injector.getInstance(CatalogManager.class));

......

injector.getInstance(Announcer.class).start();

start方法内部调用下面的 announce方法,对广播的结果进行捕获,并对之后的广播进行调度。其中最核心的广播过程的代码,委托给实现了DiscoveryAnnouncementClient接口announce方法的类来处理。

private ListenableFuture<Duration> announce(long delayStart, Duration expectedDelay)
    {
        // log announcement did not happen within 5 seconds of expected delay
        if (System.nanoTime() - (delayStart + expectedDelay.roundTo(NANOSECONDS)) > SECONDS.toNanos(5)) {
            log.error("Expected service announcement after %s, but announcement was delayed %s", expectedDelay, Duration.nanosSince(delayStart));
        }

        long requestStart = System.nanoTime();
//调用实现了DiscoveryAnnouncementClient接口announce方法的类来进行消息实现广播
        ListenableFuture<Duration> future = announcementClient.announce(getServiceAnnouncements());

        Futures.addCallback(future, new FutureCallback<Duration>()
        {
            @Override
            public void onSuccess(Duration expectedDelay)
            {
                errorBackOff.success();

                // wait 80% of the suggested delay
                expectedDelay = new Duration(expectedDelay.toMillis() * 0.8, MILLISECONDS);
                log.debug("Service announcement succeeded after %s. Next request will happen within %s", Duration.nanosSince(requestStart), expectedDelay);

                scheduleNextAnnouncement(expectedDelay);
            }

            @Override
            public void onFailure(Throwable t)
            {
                Duration duration = errorBackOff.failed(t);
                // todo this is a duplicate log message and should be remove after root cause of announcement delay is determined
                log.error("Service announcement failed after %s. Next request will happen within %s", Duration.nanosSince(requestStart), expectedDelay);
                scheduleNextAnnouncement(duration);
            }
        }, executor);

        return future;
    }

DiscoveryAnnouncementClient

DiscoveryAnnouncementClient提供了announce的广播接口

public interface DiscoveryAnnouncementClient
{
    Duration DEFAULT_DELAY = new Duration(10, TimeUnit.SECONDS);

    ListenableFuture<Duration> announce(Set<ServiceAnnouncement> services);

    ListenableFuture<Void> unannounce();
}

它的实现类有两个:

1) HttpDiscoveryAnnouncementClient :http方式广播

2) InMemoryDiscoveryClient:内存方式广播

这里的广播用的是HttpDiscoveryAnnouncementClient方式,来看一下其announce方法如何实现广播。discoveryServiceURI是加载了配置config.properties中的discovery.uri地址,并拼接"/v1/announcement"及nodeId来构造请求的url

public ListenableFuture<Duration> announce(Set<ServiceAnnouncement> services)
    {
        requireNonNull(services, "services is null");
//集群内节点共享相同的discovery url
        URI uri = discoveryServiceURI.get();
        if (uri == null) {
            return immediateFailedFuture(new DiscoveryException("No discovery servers are available"));
        }

        Announcement announcement = new Announcement(nodeInfo.getEnvironment(), nodeInfo.getNodeId(), nodeInfo.getPool(), nodeInfo.getLocation(), services);
//构建http请求参数
        Request request = preparePut()
                .setUri(createAnnouncementLocation(uri, nodeInfo.getNodeId()))
                .setHeader("User-Agent", nodeInfo.getNodeId())
                .setHeader("Content-Type", MEDIA_TYPE_JSON.toString())
                .setBodyGenerator(jsonBodyGenerator(announcementCodec, announcement))
                .build();
//异步执行及响应处理
        return httpClient.executeAsync(request, new DiscoveryResponseHandler<Duration>("Announcement", uri)
        {
            @Override
            public Duration handle(Request request, Response response)
                    throws DiscoveryException
            {
                int statusCode = response.getStatusCode();
                if (!isSuccess(statusCode)) {
                    throw new DiscoveryException(String.format("Announcement failed with status code %s: %s", statusCode, getBodyForError(response)));
                }

                Duration maxAge = extractMaxAge(response);
                return maxAge;
            }
        });
    }

流程总结

总结一下,updateConnectorIds这部分的流程:

1)调用updateConnectorIds前,catalog及connector已经创建完毕。

2)调用updateConnectorIds时,将catalog更新到ServiceAnnouncement,并添加到Announcer中。

3)调用updateConnectorIds后,又执行了injector.getInstance(Announcer.class).start(),调用了Announcer的start方法来启动广播调度线程。

4)在Announcer的start方法中,通过调用HttpDiscoveryAnnouncementClient的announce实现将本节点的ServiceAnnouncement信息广播到集群中,完成集群节点信息的同步。


总结

本文首先对Catalog创建的流程以及创建过程中涉及到的重要类,进行了整理和介绍。

其次围绕updateConnectorIds介绍了catalog的信息如何注册到airlift中,并如何通过airlift的服务发现机制,将信息同步到集群其它节点。

至此,加载Catalog部分的源代码解读告一段落,后面的文章中会围绕动态数据源管理的二次开发方案进行介绍。


版权声明:本文为qaz1qaz1qaz2原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。