目录
getTrinoAnnouncement(Set announcements)
前言
上文中介绍了如何在Trino中添加Rest API接口,以及Trino加载到创建Catalog的过程。由于篇幅的原因没有对updateConnectorIds部分的源码进行调研。
本文中主要涵盖两部分的内容:
1)Trino Catalog创建过程中涉及到类进行梳理
2)updateConnectorIds调研
一、Catalog创建过程中涉及的类
1.Catalog创建流程如下图

2.过程中涉及到一些比较重要的类,写出来介绍一下
- CatalogName: 定义3种不同类别的catalog,分别为: catalog本身,$info_schema@ 和 $system@。
- Catalog: 包含CatalogName和Connector两类对象,每个类别中有三种:catalog本身,informationSchema,systemTables。
- CatalogManager: 对Catalog进行管理的容器类。
- Connector: SPI包下的接口,定义了连接数据源所需要的接口如索引,数据分页处理,数据结果获取等一系列的方法,为扩展不同类型的数据源提供标准。
- MaterializedConnector: ConnectorManager的内部类,封装了连接器所需的内容。比如用于管理分片的ConnectorSplitManager、管理分页读写的ConnectorPageSourceProvider和ConnectorPageSinkProvider、管理索引的ConnectorIndexProvider、管理分片的ConnectorNodePartitioningProvider等等,以及表级别的各种PropertyMetadata属性等。
- ConnectorFactory: SPI包下的接口,定义了创建连接的create,及获取处理器的getHandleResolver方法,为扩展不同类型的数据源提供标准。
- ConnectorManager: 管理Connector的容器类,封装了很多与数据源交互的重要类,如MetadataManager,SplitManager,HandleResolver,TransactionManager,EventListenerManager等,也提供了createCatalog(),createConnector()等重要的方法。
- HandleResolver: 维护catalog和其对应数据源处理器的容器类,与Springboot中管理url和其业务处理方法的容器类作用相似。
- EventListenerManager: 管理Event的类。
二、updateConnectorIds
之前围绕injector.getInstance(StaticCatalogStore.class).loadCatalogs()这行代码进行了深入的调研,下面是第二块比较重要的代码updateConnectorIds。
// TODO: remove this huge hack
updateConnectorIds(injector.getInstance(Announcer.class), injector.getInstance(CatalogManager.class));看了Trino代码后,发现代码的注释真的很少,这行代码上的注释,说明updateConnectorIds会在未来的版本中被移除掉,不过现在我们还是来研究下。
private static void updateConnectorIds(Announcer announcer, CatalogManager metadata)
{
// get existing announcement
ServiceAnnouncement announcement = getTrinoAnnouncement(announcer.getServiceAnnouncements());
// automatically build connectorIds if not configured
Set<String> connectorIds = metadata.getCatalogs().stream()
.map(Catalog::getConnectorCatalogName)
.map(Object::toString)
.collect(toImmutableSet());
// build announcement with updated sources
ServiceAnnouncementBuilder builder = serviceAnnouncement(announcement.getType());
builder.addProperties(announcement.getProperties());
builder.addProperty("connectorIds", Joiner.on(',').join(connectorIds));
// update announcement
announcer.removeServiceAnnouncement(announcement.getId());
announcer.addServiceAnnouncement(builder.build());
}可以看出此方法围绕ServiceAnnouncement的更新及重新添加到Announcer实例中,
进行了如下工作:
1)在Announcer实例中获取类型为trino的ServiceAnnouncement对象,具体调用getTrinoAnnouncement(Set<ServiceAnnouncement> announcements)
2)构建catalog名称的集合
3)用catalog的集合,构建新的ServiceAnnouncementBuilder对象
4)更新ServiceAnnouncement,重新添加到Announcer实例中
getTrinoAnnouncement(Set<ServiceAnnouncement> announcements)
这个方法很简单,从容器中获取类型为trino的ServiceAnnouncement。
private static ServiceAnnouncement getTrinoAnnouncement(Set<ServiceAnnouncement> announcements)
{
for (ServiceAnnouncement announcement : announcements) {
if (announcement.getType().equals("trino")) {
return announcement;
}
}
throw new IllegalArgumentException("Trino announcement not found: " + announcements);
}ServiceAnnouncement
这里多次提到了ServiceAnnouncement,那么ServiceAnnouncement的作用是什么?
ServiceAnnouncement类位于airlift框架discovery.client包下,io.airlift.discovery用于服务的发现。
从源码来看,ServiceAnnouncement像是对服务进行声明的类或者说包装类。
public class ServiceAnnouncement
{
private final UUID id = UUID.randomUUID();
private final String type;
private final Map<String, String> properties;
private ServiceAnnouncement(String type, Map<String, String> properties)
{
requireNonNull(type, "type is null");
requireNonNull(properties, "properties is null");
this.type = type;
this.properties = ImmutableMap.copyOf(properties);
}
......
}产生了疑问,将catalog声明成ServiceAnnouncement并添加到Announcer中的作用是什么呢?
Announcer
在执行完updateConnectorIds方法后,如下面代码所示,会执行Announcer的start方法
// TODO: remove this huge hack
updateConnectorIds(injector.getInstance(Announcer.class), injector.getInstance(CatalogManager.class));
......
injector.getInstance(Announcer.class).start();start方法内部调用下面的 announce方法,对广播的结果进行捕获,并对之后的广播进行调度。其中最核心的广播过程的代码,委托给实现了DiscoveryAnnouncementClient接口announce方法的类来处理。
private ListenableFuture<Duration> announce(long delayStart, Duration expectedDelay)
{
// log announcement did not happen within 5 seconds of expected delay
if (System.nanoTime() - (delayStart + expectedDelay.roundTo(NANOSECONDS)) > SECONDS.toNanos(5)) {
log.error("Expected service announcement after %s, but announcement was delayed %s", expectedDelay, Duration.nanosSince(delayStart));
}
long requestStart = System.nanoTime();
//调用实现了DiscoveryAnnouncementClient接口announce方法的类来进行消息实现广播
ListenableFuture<Duration> future = announcementClient.announce(getServiceAnnouncements());
Futures.addCallback(future, new FutureCallback<Duration>()
{
@Override
public void onSuccess(Duration expectedDelay)
{
errorBackOff.success();
// wait 80% of the suggested delay
expectedDelay = new Duration(expectedDelay.toMillis() * 0.8, MILLISECONDS);
log.debug("Service announcement succeeded after %s. Next request will happen within %s", Duration.nanosSince(requestStart), expectedDelay);
scheduleNextAnnouncement(expectedDelay);
}
@Override
public void onFailure(Throwable t)
{
Duration duration = errorBackOff.failed(t);
// todo this is a duplicate log message and should be remove after root cause of announcement delay is determined
log.error("Service announcement failed after %s. Next request will happen within %s", Duration.nanosSince(requestStart), expectedDelay);
scheduleNextAnnouncement(duration);
}
}, executor);
return future;
}DiscoveryAnnouncementClient
DiscoveryAnnouncementClient提供了announce的广播接口
public interface DiscoveryAnnouncementClient
{
Duration DEFAULT_DELAY = new Duration(10, TimeUnit.SECONDS);
ListenableFuture<Duration> announce(Set<ServiceAnnouncement> services);
ListenableFuture<Void> unannounce();
}它的实现类有两个:
1) HttpDiscoveryAnnouncementClient :http方式广播
2) InMemoryDiscoveryClient:内存方式广播
这里的广播用的是HttpDiscoveryAnnouncementClient方式,来看一下其announce方法如何实现广播。discoveryServiceURI是加载了配置config.properties中的discovery.uri地址,并拼接"/v1/announcement"及nodeId来构造请求的url
public ListenableFuture<Duration> announce(Set<ServiceAnnouncement> services)
{
requireNonNull(services, "services is null");
//集群内节点共享相同的discovery url
URI uri = discoveryServiceURI.get();
if (uri == null) {
return immediateFailedFuture(new DiscoveryException("No discovery servers are available"));
}
Announcement announcement = new Announcement(nodeInfo.getEnvironment(), nodeInfo.getNodeId(), nodeInfo.getPool(), nodeInfo.getLocation(), services);
//构建http请求参数
Request request = preparePut()
.setUri(createAnnouncementLocation(uri, nodeInfo.getNodeId()))
.setHeader("User-Agent", nodeInfo.getNodeId())
.setHeader("Content-Type", MEDIA_TYPE_JSON.toString())
.setBodyGenerator(jsonBodyGenerator(announcementCodec, announcement))
.build();
//异步执行及响应处理
return httpClient.executeAsync(request, new DiscoveryResponseHandler<Duration>("Announcement", uri)
{
@Override
public Duration handle(Request request, Response response)
throws DiscoveryException
{
int statusCode = response.getStatusCode();
if (!isSuccess(statusCode)) {
throw new DiscoveryException(String.format("Announcement failed with status code %s: %s", statusCode, getBodyForError(response)));
}
Duration maxAge = extractMaxAge(response);
return maxAge;
}
});
}流程总结
总结一下,updateConnectorIds这部分的流程:
1)调用updateConnectorIds前,catalog及connector已经创建完毕。
2)调用updateConnectorIds时,将catalog更新到ServiceAnnouncement,并添加到Announcer中。
3)调用updateConnectorIds后,又执行了injector.getInstance(Announcer.class).start(),调用了Announcer的start方法来启动广播调度线程。
4)在Announcer的start方法中,通过调用HttpDiscoveryAnnouncementClient的announce实现将本节点的ServiceAnnouncement信息广播到集群中,完成集群节点信息的同步。
总结
本文首先对Catalog创建的流程以及创建过程中涉及到的重要类,进行了整理和介绍。
其次围绕updateConnectorIds介绍了catalog的信息如何注册到airlift中,并如何通过airlift的服务发现机制,将信息同步到集群其它节点。
至此,加载Catalog部分的源代码解读告一段落,后面的文章中会围绕动态数据源管理的二次开发方案进行介绍。