知是行的主意,行是知的功夫。这篇文章主要讲述Tars | 第2篇 TarsJava SpingBoot启动与负载均衡源码初探 #yyds干货盘点#相关的知识,希望能为你提供帮助。
@[TOC](Tarsjava SpingBoot启动与负载均衡源码初探)
前言通过源码分析可以得出这样一个负载均衡的源码结构图(基于TarsJava SpringBoot):
@EnableTarsServer注解:表明这是一个Tars服务;
- @Import(TarsServerConfiguration.class):引入Tars服务相关配置文件;
- Communcator:通信器;
- getServantProxyFactory():获取代理工厂管理者;
- getObjectProxyFactory():获取对象代理工厂;
- createLoadBalance():创建客户端负载均衡调用器;
- select():选择负载均衡调用器(有四种模式可以选择);
- invoker:调用器;
- invoke():具体的执行方法;
- doInvokeServant():最底层的执行方法;
- refresh():更新负载均衡调用器;
- createProtocolInvoker():创建协议调用器;
@注解
- 中文含义
- 加在哪
- 其他……
语句示例
//代码示例
文章图片
可以发现它与普通SpringBoot应用的区别在于多了个
@EnableTarsServer
注解;@EnableTarsServer
- Tars服务;
- 用在主启动类上;
- 表名该服务是一个Tars服务,启用Tars功能;
@EnableTarsServer
注解源码:@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TarsServerConfiguration.class)
public @interface EnableTarsServer
可以知道他帮我们引入了Tars服务配置类
TarsServerConfiguration.class
,我们点进去:@Configuration
public class TarsServerConfiguration private final Server server = Server.getInstance();
@Bean
public Server server()
return this.server;
@Bean
// 从通信器工厂注入通信器Communcator
public Communicator communicator()
return CommunicatorFactory.getInstance().getCommunicator();
@Bean
//通信器后置处理器
public CommunicatorBeanPostProcessor communicatorBeanPostProcessor(Communicator communicator)
return new CommunicatorBeanPostProcessor(communicator);
@Bean
//注入配置帮助器
public ConfigHelper configHelper()
return ConfigHelper.getInstance();
@Bean
//注入Servlet容器定制器
public ServletContainerCustomizer servletContainerCustomizer()
return new ServletContainerCustomizer();
@Bean
//Tars服务器启动生命周期
public TarsServerStartLifecycle applicationStartLifecycle(Server server)
return new TarsServerStartLifecycle(server);
在这些容器中,可以看出最重要的是通信器
Communicator
,里面定义了代理方式、配置文件、负载均衡选择器等重要属性,下面我们来分析这个容器2. Communicator通信器通过源码分析,我们可以知道这个容器里有通信器相关初始化
initCommunicator()
、关闭shutdown()
、获取容器idgetId()
等基础方法,此外,有几个比较关键的方法:文章图片
getCommunicatorConfig
:获取客户端协调器的配置文件。该配置文件里做了一些超时、线程数等相关配置;
文章图片
getServantProxyFactory
:获取代理工厂管理者。管理者的主要作用是管理ObjectProxyFactory,如果缓存有就从缓存中取,没有就生产;
public < T> Object getServantProxy(Class< T> clazz, String objName, String setDivision, ServantProxyConfig servantProxyConfig, LoadBalance loadBalance, ProtocolInvoker< T> protocolInvoker) //获取管理者的键 String key = setDivision != null ? clazz.getSimpleName() + objName + setDivision : clazz.getSimpleName() + objName; //通过键从缓存中获取管理者的值 Object proxy = cache.get(key); if (proxy == null) lock.lock(); try proxy = cache.get(key); if (proxy == null) //创建管理者 ObjectProxy< T> objectProxy = communicator.getObjectProxyFactory().getObjectProxy( clazz, objName, setDivision, servantProxyConfig, loadBalance, protocolInvoker); //将管理者放进缓存 cache.put(key, createProxy(clazz, objectProxy)); proxy = cache.get(key); finally lock.unlock(); return proxy;
getObjectProxyFactory
:获取对象代理工厂。该工厂的作用是生产对象代理ObjectProxy,包括创建Servant服务的配置信息与更新服务端点等:
//生产对象代理ObjectProxy public < T> ObjectProxy< T> getObjectProxy(Class< T> api, String objName, String setDivision, ServantProxyConfig servantProxyConfig, LoadBalance< T> loadBalance, ProtocolInvoker< T> protocolInvoker) throws ClientException //如果容器里没有服务代理相关配置,则生成默认配置;如果容器里有服务代理相关配置,说明用户自定义了用户配置了服务代理,则读取用户配置文件进行自定义配置(SpringBoot的核心思想之一) if (servantProxyConfig == null) servantProxyConfig = createServantProxyConfig(objName, setDivision); else servantProxyConfig.setCommunicatorId(communicator.getId()); servantProxyConfig.setModuleName(communicator.getCommunicatorConfig().getModuleName(), communicator.getCommunicatorConfig().isEnableSet(), communicator.getCommunicatorConfig().getSetDivision()); servantProxyConfig.setLocator(communicator.getCommunicatorConfig().getLocator()); addSetDivisionInfo(servantProxyConfig, setDivision); servantProxyConfig.setRefreshInterval(communicator.getCommunicatorConfig().getRefreshEndpointInterval()); servantProxyConfig.setReportInterval(communicator.getCommunicatorConfig().getReportInterval()); //更新服务端点 updateServantEndpoints(servantProxyConfig); //【重要】创建客户端负载均衡调用器 if (loadBalance == null) loadBalance = createLoadBalance(servantProxyConfig); //创建协议调用器 if (protocolInvoker == null) protocolInvoker = createProtocolInvoker(api, servantProxyConfig); return new ObjectProxy< T> (api, servantProxyConfig, loadBalance, protocolInvoker, communicator); ……//创建Servant服务的配置信息 private ServantProxyConfig createServantProxyConfig(String objName, String setDivision) throws CommunicatorConfigException …………//更新服务端点:通过ObjectName判断是有设置了服务器节点,如果有(本地只连接),如果没有那就从tars管理中获取服务器节点。放在ServantCacheManager管理起来。 private void updateServantEndpoints(ServantProxyConfig cfg) CommunicatorConfig communicatorConfig = communicator.getCommunicatorConfig(); ……
*除了创建了一个负载均衡调用器
LoadBalance
,还创建了一个协议调用器protocolInvoker
,该协议调用器里分别对同步与异步调用方法、Tars与Http协议请求处理、以及过滤器等相关配置,但我们的重点不在这,下面将着重分析LoadBalance
。3. 客户端的负载均衡调用器LoadBalance我们点进去查看原有负载均衡逻辑,发现这是一个接口,里面定义了两个方法,都是与负载均衡调用器相关的:
public interface LoadBalance<
T>
/**
* 选择负载均衡调用器
* @param 调用的上下文
* @return
* @throws 无负载均衡调用器 - 异常
*/
Invoker<
T>
select(InvokeContext invokeContext) throws NoInvokerException;
/**
* 刷新本地负载均衡调用器
* @param 负载均衡调用器
*/
void refresh(Collection<
Invoker<
T>
>
invokers);
我们Ctrl+H一下即可发现该接口有四个实现类:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KJyHqhl5-1627523692062)(https://lexiangla.com/assets/3963f704ee0411ebbe94aee286d18512 " 负载均衡调用器实现类" )]
分别是:
- ConsistentHashLoadBalance:一致hash选择器;
- HashLoadBalance:hash选择器;
- RoundRobinLoadBalance: 轮询选择器;
- DefaultLoadBalance:默认的选择器(由源码可知先ConsistentHashLoadBalance,HashLoadBalance,RoundRobinLoadBalance);
RoundRobinLoadBalance
的select方法:@Override
public Invoker<
T>
select(InvokeContext invocation) throws NoInvokerException //静态权重缓存器列表
List<
Invoker<
T>
>
staticWeightInvokers = staticWeightInvokersCache;
//使用权重轮询
if (staticWeightInvokers != null &
&
!staticWeightInvokers.isEmpty())
//【体现轮询】根据index获取一个调用器,规则是:获取“静态权重顺序递增值”的绝对值后对“静态权重缓存器数”取余?
Invoker<
T>
invoker = staticWeightInvokers.get((staticWeightSequence.getAndIncrement() &
Integer.MAX_VALUE) % staticWeightInvokers.size());
//如果调用器存活则直接返回
if (invoker.isAvailable()) return invoker;
//判断存活:先根据调用器的url获取“调用器活动状态”,判断:状态的“上次重试时间”+“尝试重启时间间隔” <
“系统当前时间”,存活则将系统当前时间设置为“上次重启时间”
ServantInvokerAliveStat stat = ServantInvokerAliveChecker.get(invoker.getUrl());
if (stat.isAlive() || (stat.getLastRetryTime() + (config.getTryTimeInterval() * 1000)) <
System.currentTimeMillis())
logger.info("try to use inactive invoker|" + invoker.getUrl().toIdentityString());
stat.setLastRetryTime(System.currentTimeMillis());
return invoker;
//无权重轮询,抛出异常
List<
Invoker<
T>
>
sortedInvokers = sortedInvokersCache;
if (CollectionUtils.isEmpty(sortedInvokers))
throw new NoInvokerException("no such active connection invoker");
List<
Invoker<
T>
>
list = new ArrayList<
Invoker<
T>
>
();
for (Invoker<
T>
invoker : sortedInvokers)
//如果调用器挂了
if (!invoker.isAvailable()) //尝试救回调用器:先根据调用器的url获取“调用器活动状态”,判断:状态的“上次重试时间”+“尝试重启时间间隔” <
“系统当前时间”,存活则加入到list中,挂了就不加入
ServantInvokerAliveStat stat = ServantInvokerAliveChecker.get(invoker.getUrl());
if (stat.isAlive() || (stat.getLastRetryTime() + (config.getTryTimeInterval() * 1000)) <
System.currentTimeMillis())
list.add(invoker);
else
//调用器存活则将调用器添加到list里
list.add(invoker);
//TODO When all is not available. Whether to randomly extract one
if (list.isEmpty())
throw new NoInvokerException(config.getSimpleObjectName() + " try to select active invoker, size=" + sortedInvokers.size() + ", no such active connection invoker");
//随机获取一个调用器?
Invoker<
T>
invoker = list.get((sequence.getAndIncrement() &
Integer.MAX_VALUE) % list.size());
//如果调用器不存活,则将当前系统时间设置为该调用器的上次重启时间
if (!invoker.isAvailable())
//Try to recall after blocking
logger.info("try to use inactive invoker|" + invoker.getUrl().toIdentityString());
ServantInvokerAliveChecker.get(invoker.getUrl()).setLastRetryTime(System.currentTimeMillis());
return invoker;
可以看出
select
方法重点还是在于“怎样”找到一个负载均衡调用器,只不过实现的方法不同,有的采用轮询的方法、有的根据hash值,而我们关注的是给负载均衡方法做扩展(增添路由规则),因此这里也不是重点。但为我们指明了一个方向,就是上面源码里反复提到的invoker
调用器(invoker老眼熟了,SpringBoot里的controller参数处理里也有它)。我们来看看Tars里的
invoker
,它也是一个接口,只有一个实现类,public interface Invoker<
T>
//获取uil
Url getUrl();
//获取api
Class<
T>
getApi();
//判断是否存活
boolean isAvailable();
//执行方法
Object invoke(InvokeContext context) throws Throwable;
//销毁方法
void destroy();
文章图片
通过对这几个实现类的源码阅读,我们发现
invoke
方法就是对doInvokeServant
底层方法进行层层封装。通过对
TarsInvoker
的源码阅读,我们还可以知道TarsInvoker有四个属性config、api、url、clients,对应前面提到的getXXX对应方法;还可以设置是否存活,对应前文对是否存活的判断。在doInvokeServant
里最核心的操作流程是try里面的语句:public class TarsInvoker<
T>
extends ServantInvoker<
T>
final List<
Filter>
filters;
public TarsInvoker(ServantProxyConfig config, Class<
T>
api, Url url, ServantClient[] clients)
super(config, api, url, clients);
filters = AppContextManager.getInstance().getAppContext() == null ? null : AppContextManager.getInstance().getAppContext().getFilters(FilterKind.CLIENT);
@Override
public void setAvailable(boolean available)
super.setAvailable(available);
@Override
protected Object doInvokeServant(final ServantInvokeContext inv) throws Throwable
final long begin = System.currentTimeMillis();
int ret = Constants.INVOKE_STATUS_SUCC;
try
//根据api获取将要执行的方法
Method method = getApi().getMethod(inv.getMethodName(), inv.getParameterTypes());
//如果是异步调用
if (inv.isAsync())
//执行异步方法
invokeWithAsync(method, inv.getArguments(), inv.getAttachments());
return null;
//如果是承诺未来???
else if (inv.isPromiseFuture())
return invokeWithPromiseFuture(method, inv.getArguments(), inv.getAttachments());
// return Future Result
else
//执行同步方法
TarsServantResponse response = invokeWithSync(method, inv.getArguments(), inv.getAttachments());
ret = response.getRet() == TarsHelper.SERVERSUCCESS ? Constants.INVOKE_STATUS_SUCC : Constants.INVOKE_STATUS_EXEC;
if (response.getRet() != TarsHelper.SERVERSUCCESS)
throw ServerException.makeException(response.getRet(), response.getRemark());
return response.getResult();
catch (Throwable e)
if (e instanceof TimeoutException)
ret = Constants.INVOKE_STATUS_TIMEOUT;
else if (e instanceof NotConnectedException)
ret = Constants.INVOKE_STATUS_NETCONNECTTIMEOUT;
else
ret = Constants.INVOKE_STATUS_EXEC;
throw e;
finally
if (inv.isNormal())
setAvailable(ServantInvokerAliveChecker.isAlive(getUrl(), config, ret));
InvokeStatHelper.getInstance().addProxyStat(objName)
.addInvokeTimeByClient(config.getMasterName(), config.getSlaveName(), config.getSlaveSetName(), config.getSlaveSetArea(),
config.getSlaveSetID(), inv.getMethodName(), getUrl().getHost(), getUrl().getPort(), ret, System.currentTimeMillis() - begin);
……
而try语句里主要做的是执行的调用方法(异步、同步、承诺未来),由于我们要扩展的路由功能与调用方法无关,这里就不深入分析了。
由此我们可以分析得出负载均衡设计的底层结构图:
@EnableTarsServer注解:表明这是一个Tars服务;
- @Import(TarsServerConfiguration.class):引入Tars服务相关配置文件;
- Communcator:通信器;
- getServantProxyFactory():获取代理工厂管理者;
- getObjectProxyFactory():获取对象代理工厂;
- createLoadBalance():创建客户端负载均衡调用器;
- select():选择负载均衡调用器(有四种模式可以选择);
- invoker:调用器;
- invoke():具体的执行方法;
- doInvokeServant():最底层的执行方法;
- refresh():更新负载均衡调用器;
- createProtocolInvoker():创建协议调用器;
新人制作,如有错误,欢迎指出,感激不尽!
:::
::: hljs-center
欢迎关注公众号,会分享一些更日常的东西!
:::
::: hljs-center
如需转载,请标注出处!
:::
::: hljs-center
文章图片
:::
推荐阅读
- Keepalived+LVS实战案例( 单主架构实现WEB负载均衡及可用)
- 盘点微软对Windows8 Beta关键技巧改进
- 无法更新到windows8.1的原因与处理办法
- 微软Build开发者大会:Win 8 Update更新5大变化
- 本地离线安装Win 8 Update简体中文语言包
- win8.1系统skydrive中的特定文件夹设置
- 在线安装Win 8 Update简体中文语言包的办法
- 图文 打开Win8系统Hyper-V虚拟机管理器的办法
- 图文 如何找出Win8关机按钮并创建快捷方式