[年薪60W分水岭]基于Netty手写实现Apache Dubbo(带注册中心和注解)

落花踏尽游何处,笑入胡姬酒肆中。这篇文章主要讲述[年薪60W分水岭]基于Netty手写实现Apache Dubbo(带注册中心和注解)相关的知识,希望能为你提供帮助。

[年薪60W分水岭]基于Netty手写实现Apache Dubbo(带注册中心和注解)

文章图片

1. 详细剖析分布式微服务架构下网络通信的底层实现原理(图解)
2. (年薪60W的技巧)工作了5年,你真的理解Netty以及为什么要用吗?(深度干货)
3. 深度解析Netty中的核心组件(图解+实例)
4. BAT面试必问细节:关于Netty中的ByteBuf详解
5. 通过大量实战案例分解Netty中是如何解决拆包黏包问题的?
6. 基于Netty实现自定义消息通信协议(协议设计及解析应用实战)
7. 全网最详细最齐全的序列化技术及深度解析与应用实战
8. 手把手教你基于Netty实现一个基础的RPC框架(通俗易懂)
在本篇文章中,我们继续围绕Netty手写实现RPC基础篇进行优化,主要引入几个点
  • 集成spring,实现注解驱动配置
  • 集成zookeeper,实现服务注册
  • 增加负载均衡实现
增加注解驱动主要涉及到的修改模块
  • netty-rpc-protocol
  • netty-rpc-provider
netty-rpc-protocol当前模块主要修改的类如下。
[年薪60W分水岭]基于Netty手写实现Apache Dubbo(带注册中心和注解)

文章图片

< center> 图7-1< /center>
下面针对netty-rpc-protocol模块的修改如下
增加注解驱动这个注解的作用是用来指定某些服务为远程服务
@Target(ElementType.TYPE)// Target说明了Annotation所修饰的对象范围, TYPE:用于描述类、接口(包括注解类型) 或enum声明 @Retention(RetentionPolicy.RUNTIME)// Reteniton的作用是定义被它所注解的注解保留多久,保留至运行时。所以我们可以通过反射去获取注解信息。 @Component public @interface GpRemoteService {}

SpringRpcProviderBean这个类主要用来在启动NettyServer,以及保存bean的映射关系
@Slf4j public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor {private final int serverPort; private final String serverAddress; public SpringRpcProviderBean(int serverPort) throws UnknownHostException { this.serverPort = serverPort; InetAddress address=InetAddress.getLocalHost(); this.serverAddress=address.getHostAddress(); }@Override public void afterPropertiesSet() throws Exception { log.info("begin deploy Netty Server to host {},on port {}",this.serverAddress,this.serverPort); new Thread(()-> { try { new NettyServer(this.serverAddress,this.serverPort).startNettyServer(); } catch (Exception e) { log.error("start Netty Server Occur Exception,",e); e.printStackTrace(); } }).start(); }//bean实例化后调用 @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if(bean.getClass().isAnnotationPresent(GpRemoteService.class)){ //针对存在该注解的服务进行发布 Method[] methods=bean.getClass().getDeclaredMethods(); for(Method method: methods){ //保存需要发布的bean的映射 String key=bean.getClass().getInterfaces()[0].getName()+"."+method.getName(); BeanMethod beanMethod=new BeanMethod(); beanMethod.setBean(bean); beanMethod.setMethod(method); Mediator.beanMethodMap.put(key,beanMethod); } } return bean; } }

Mediator主要管理bean以及调用
BeanMethod
@Data public class BeanMethod { private Object bean; private Method method; }

Mediator
负责持有发布bean的管理,以及bean的反射调用
public class Mediator { public static Map< String,BeanMethod> beanMethodMap=new ConcurrentHashMap< > (); private volatile static Mediator instance=null; private Mediator(){ }public static Mediator getInstance(){ if(instance==null){ synchronized (Mediator.class){ if(instance==null){ instance=new Mediator(); } } } return instance; } public Object processor(RpcRequest rpcRequest){ String key=rpcRequest.getClassName()+"."+rpcRequest.getMethodName(); BeanMethod beanMethod=beanMethodMap.get(key); if(beanMethod==null){ return null; } Object bean=beanMethod.getBean(); Method method=beanMethod.getMethod(); try { return method.invoke(bean,rpcRequest.getParams()); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } return null; } }

RpcServerProperties定义配置属性
@Data @ConfigurationProperties(prefix = "gp.rpc") public class RpcServerProperties {private int servicePort; }

RpcProviderAutoConfiguration定义自动配置类
@Configuration @EnableConfigurationProperties(RpcServerProperties.class) public class RpcProviderAutoConfiguration {@Bean public SpringRpcProviderBean rpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException { return new SpringRpcProviderBean(rpcServerProperties.getServicePort()); } }

修改RpcServerHandler修改调用方式,直接使用Mediator的调用即可。
public class RpcServerHandler extends SimpleChannelInboundHandler< RpcProtocol< RpcRequest> > {@Override protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol< RpcRequest> msg) throws Exception { RpcProtocol resProtocol=new RpcProtocol< > (); Header header=msg.getHeader(); header.setReqType(ReqType.RESPONSE.code()); Object result=Mediator.getInstance().processor(msg.getContent()); //主要修改这个部分 resProtocol.setHeader(header); RpcResponse response=new RpcResponse(); response.setData(result); response.setMsg("success"); resProtocol.setContent(response); ctx.writeAndFlush(resProtocol); } }

netty-rpc-provider这个模块中主要修改两个部分
  • application.properties
  • NettyRpcProviderMain
NettyRpcProviderMain
@ComponentScan(basePackages = {"com.example.spring.annotation","com.example.spring.service","com.example.service"}) @SpringBootApplication public class NettyRpcProviderMain {public static void main(String[] args) throws Exception { SpringApplication.run(NettyRpcProviderMain.class, args); //去掉原来的实例化部分 } }

application.properties增加一个配置属性。
gp.rpc.servicePort=20880

UserServiceImpl把当前服务发布出去。
@GpRemoteService //表示将当前服务发布成远程服务 @Slf4j public class UserServiceImpl implements IUserService { @Override public String saveUser(String name) { log.info("begin saveUser:"+name); return "Save User Success!"; } }

修改客户端的注解驱动客户端同样也需要通过注解的方式来引用服务,这样就能够彻底的屏蔽掉远程通信的细节内容,代码结构如图7-2所示
[年薪60W分水岭]基于Netty手写实现Apache Dubbo(带注册中心和注解)

文章图片

< center> 图7-2< /center>
增加客户端注解在netty-rpc-protocol模块的annotation目录下创建下面这个注解。
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) @Autowired public @interface GpRemoteReference { }

SpringRpcReferenceBean定义工厂Bean,用来构建远程通信的代理
public class SpringRpcReferenceBean implements FactoryBean< Object> {private Class< ?> interfaceClass; private Object object; private String serviceAddress; private int servicePort; @Override public Object getObject() throws Exception { return object; }public void init(){ this.object= Proxy.newProxyInstance(this.interfaceClass.getClassLoader(), new Class< ?> []{this.interfaceClass}, new RpcInvokerProxy(this.serviceAddress,this.servicePort)); }@Override public Class< ?> getObjectType() { return this.interfaceClass; }public void setInterfaceClass(Class< ?> interfaceClass) { this.interfaceClass = interfaceClass; }public void setServiceAddress(String serviceAddress) { this.serviceAddress = serviceAddress; }public void setServicePort(int servicePort) { this.servicePort = servicePort; } }

SpringRpcReferencePostProcessor用来实现远程Bean的动态代理注入:
  • BeanClassLoaderAware: 获取Bean的类装载器
  • BeanFactoryPostProcessor:在spring容器加载了bean的定义文件之后,在bean实例化之前执行
  • ApplicationContextAware: 获取上下文对象ApplicationContenxt
@Slf4j public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor { private ApplicationContext context; private ClassLoader classLoader; private RpcClientProperties clientProperties; public SpringRpcReferencePostProcessor(RpcClientProperties clientProperties) { this.clientProperties = clientProperties; }//保存发布的引用bean信息 private final Map< String, BeanDefinition> rpcRefBeanDefinitions=new ConcurrentHashMap< > (); @Override public void setBeanClassLoader(ClassLoader classLoader) { this.classLoader=classLoader; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.context=applicationContext; }@Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { for (String beanDefinitionname:beanFactory.getBeanDefinitionNames()){ //遍历bean定义,然后获取到加载的bean,遍历这些bean中的字段,是否携带GpRemoteReference注解 //如果有,则需要构建一个动态代理实现 BeanDefinition beanDefinition=beanFactory.getBeanDefinition(beanDefinitionname); String beanClassName=beanDefinition.getBeanClassName(); if(beanClassName!=null){ //和forName方法相同,内部就是直接调用的forName方法 Class< ?> clazz=ClassUtils.resolveClassName(beanClassName,this.classLoader); //针对当前类中的指定字段,动态创建一个Bean ReflectionUtils.doWithFields(clazz,this::parseRpcReference); } } //将@GpRemoteReference注解的bean,构建一个动态代理对象 BeanDefinitionRegistry registry=(BeanDefinitionRegistry)beanFactory; this.rpcRefBeanDefinitions.forEach((beanName,beanDefinition)-> { if(context.containsBean(beanName)){ log.warn("SpringContext already register bean {}",beanName); return; } //把动态创建的bean注册到容器中 registry.registerBeanDefinition(beanName,beanDefinition); log.info("registered RpcReferenceBean {} success.",beanName); }); } private void parseRpcReference(Field field){ GpRemoteReference gpRemoteReference=AnnotationUtils.getAnnotation(field,GpRemoteReference.class); if(gpRemoteReference!=null) { BeanDefinitionBuilder builder=BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class); builder.setInitMethodName(RpcConstant.INIT_METHOD_NAME); builder.addPropertyValue("interfaceClass",field.getType()); builder.addPropertyValue("serviceAddress",clientProperties.getServiceAddress()); builder.addPropertyValue("servicePort",clientProperties.getServicePort()); BeanDefinition beanDefinition=builder.getBeanDefinition(); rpcRefBeanDefinitions.put(field.getName(),beanDefinition); } } }

需要在RpcConstant常量中增加一个INIT_METHOD_NAME属性
public class RpcConstant { //header部分的总字节数 public final static int HEAD_TOTAL_LEN=16; //魔数 public final static short MAGIC=0xca; public static final String INIT_METHOD_NAME = "init"; }

RpcClientProperties
@Data public class RpcClientProperties {private String serviceAddress="192.168.1.102"; private int servicePort=20880; }

RpcRefernceAutoConfiguration
@Configuration public class RpcRefernceAutoConfiguration implements EnvironmentAware{@Bean public SpringRpcReferencePostProcessor postProcessor(){ String address=environment.getProperty("gp.serviceAddress"); int port=Integer.parseInt(environment.getProperty("gp.servicePort")); RpcClientProperties rc=new RpcClientProperties(); rc.setServiceAddress(address); rc.setServicePort(port); return new SpringRpcReferencePostProcessor(rc); }private Environment environment; @Override public void setEnvironment(Environment environment) { this.environment=environment; } }

netty-rpc-consumer修改netty-rpc-consumer模块
  • 把该模块变成一个spring boot项目
  • 增加web依赖
  • 添加测试类
[年薪60W分水岭]基于Netty手写实现Apache Dubbo(带注册中心和注解)

文章图片

< center> 图7-3 netty-rpc-consumer模块< /center>
引入jar包依赖
< dependency> < groupId> org.springframework.boot< /groupId> < artifactId> spring-boot-starter< /artifactId> < /dependency> < dependency> < groupId> org.springframework.boot< /groupId> < artifactId> spring-boot-starter-web< /artifactId> < /dependency>

HelloController
@RestController public class HelloController {@GpRemoteReference private IUserService userService; @GetMapping("/test") public String test(){ return userService.saveUser("Mic"); } }

NettyConsumerMain
@ComponentScan(basePackages = {"com.example.spring.annotation","com.example.controller","com.example.spring.reference"}) @SpringBootApplication public class NettyConsumerMain { public static void main(String[] args) { SpringApplication.run(NettyConsumerMain.class, args); } }

application.properties
gp.serviceAddress=192.168.1.102 servicePort.servicePort=20880

访问测试
  • 启动Netty-Rpc-Server
  • 启动Netty-Rpc-Consumer
如果启动过程没有任何问题,则可以访问HelloController来测试远程服务的访问。
引入注册中心创建一个netty-rpc-registry模块,代码结构如图7-4所示。
[年薪60W分水岭]基于Netty手写实现Apache Dubbo(带注册中心和注解)

文章图片

< dependency> < groupId> org.apache.curator< /groupId> < artifactId> curator-framework< /artifactId> < version> 4.2.0< /version> < /dependency> < dependency> < groupId> org.apache.curator< /groupId> < artifactId> curator-recipes< /artifactId> < version> 4.2.0< /version> < /dependency> < dependency> < groupId> org.apache.curator< /groupId> < artifactId> curator-x-discovery< /artifactId> < version> 4.2.0< /version> < /dependency>

IRegistryService
public interface IRegistryService {/** * 注册服务 * @param serviceInfo * @throws Exception */ void register(ServiceInfo serviceInfo) throws Exception; /** * 取消注册 * @param serviceInfo * @throws Exception */ void unRegister(ServiceInfo serviceInfo) throws Exception; /** * 动态发现服务 * @param serviceName * @return * @throws Exception */ ServiceInfo discovery(String serviceName) throws Exception; }

ServiceInfo
@Data public class ServiceInfo { private String serviceName; private String serviceAddress; private int servicePort; }

ZookeeperRegistryService
@Slf4j public class ZookeeperRegistryService implements IRegistryService {private static final String REGISTRY_PATH="/registry"; //Curator中提供的服务注册与发现的组件封装,它对此抽象出了ServiceInstance、 // ServiceProvider、ServiceDiscovery三个接口,通过它我们可以很轻易的实现Service Discovery private final ServiceDiscovery< ServiceInfo> serviceDiscovery; private ILoadBalance< ServiceInstance< ServiceInfo> > loadBalance; public ZookeeperRegistryService(String registryAddress) throws Exception { CuratorFramework client= CuratorFrameworkFactory .newClient(registryAddress,new ExponentialBackoffRetry(1000,3)); JsonInstanceSerializer< ServiceInfo> serializer=new JsonInstanceSerializer< > (ServiceInfo.class); this.serviceDiscovery= ServiceDiscoveryBuilder.builder(ServiceInfo.class) .client(client) .serializer(serializer) .basePath(REGISTRY_PATH) .build(); this.serviceDiscovery.start(); loadBalance=new RandomLoadBalance(); }@Override public void register(ServiceInfo serviceInfo) throws Exception { log.info("开始注册服务,{}",serviceInfo); ServiceInstance< ServiceInfo> serviceInstance=ServiceInstance .< ServiceInfo> builder().name(serviceInfo.getServiceName()) .address(serviceInfo.getServiceAddress()) .port(serviceInfo.getServicePort()) .payload(serviceInfo) .build(); serviceDiscovery.registerService(serviceInstance); }@Override public void unRegister(ServiceInfo serviceInfo) throws Exception { ServiceInstance< ServiceInfo> serviceInstance=ServiceInstance.< ServiceInfo> builder() .name(serviceInfo.getServiceName()) .address(serviceInfo.getServiceAddress()) .port(serviceInfo.getServicePort()) .payload(serviceInfo) .build(); serviceDiscovery.unregisterService(serviceInstance); }@Override public ServiceInfo discovery(String serviceName) throws Exception { Collection< ServiceInstance< ServiceInfo> > serviceInstances= serviceDiscovery .queryForInstances(serviceName); //通过负载均衡返回某个具体实例 ServiceInstance< ServiceInfo> serviceInstance=loadBalance.select((List< ServiceInstance< ServiceInfo> > )serviceInstances); if(serviceInstance!=null){ return serviceInstance.getPayload(); } return null; } }

引入负载均衡算法由于服务端发现服务时可能有多个,所以需要用到负载均衡算法来实现
ILoadBalance
public interface ILoadBalance< T> {T select(List< T> servers); }

AbstractLoadBalance
public abstract class AbstractLoadBanalce implements ILoadBalance< ServiceInstance< ServiceInfo> > {@Override public ServiceInstance< ServiceInfo> select(List< ServiceInstance< ServiceInfo> > servers){ if(servers==null||servers.size()==0){ return null; } if(servers.size()==1){ return servers.get(0); } return doSelect(servers); }protected abstract ServiceInstance< ServiceInfo> doSelect(List< ServiceInstance< ServiceInfo> > servers); }

RandomLoadBalance
public class RandomLoadBalance extends AbstractLoadBanalce { @Override protected ServiceInstance< ServiceInfo> doSelect(List< ServiceInstance< ServiceInfo> > servers) { int length=servers.size(); Random random=new Random(); return servers.get(random.nextInt(length)); } }

RegistryType
public enum RegistryType {ZOOKEEPER((byte)0), EUREKA((byte)1); private byte code; RegistryType(byte code) { this.code=code; }public byte code(){ return this.code; }public static RegistryType findByCode(byte code) { for (RegistryType rt : RegistryType.values()) { if (rt.code() == code) { return rt; } } return null; } }

RegistryFactory
public class RegistryFactory {public static IRegistryService createRegistryService(String address,RegistryType registryType){ IRegistryService registryService=null; try { switch (registryType) { case ZOOKEEPER: registryService = new ZookeeperRegistryService(address); break; case EUREKA: //TODO break; default: registryService = new ZookeeperRegistryService(address); break; } }catch (Exception e){ e.printStackTrace(); } return registryService; } }

修改服务端增加服务注册修改netty-rpc-protocol模块,加入注册中心的支持
SpringRpcProviderBean按照下面case标注部分,表示要修改的内容
@Slf4j public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor {private final int serverPort; private final String serverAddress; private final IRegistryService registryService; //修改部分,增加注册中心实现 public SpringRpcProviderBean(int serverPort,IRegistryService registryService) throws UnknownHostException { this.serverPort = serverPort; InetAddress address=InetAddress.getLocalHost(); this.serverAddress=address.getHostAddress(); this.registryService=registryService; //修改部分,增加注册中心实现 }@Override public void afterPropertiesSet() throws Exception { log.info("begin deploy Netty Server to host {},on port {}",this.serverAddress,this.serverPort); new Thread(()-> { try { new NettyServer(this.serverAddress,this.serverPort).startNettyServer(); } catch (Exception e) { log.error("start Netty Server Occur Exception,",e); e.printStackTrace(); } }).start(); }@Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if(bean.getClass().isAnnotationPresent(GpRemoteService.class)){ //针对存在该注解的服务进行发布 Method[] methods=bean.getClass().getDeclaredMethods(); for(Method method: methods){ String serviceName=bean.getClass().getInterfaces()[0].getName(); String key=serviceName+"."+method.getName(); BeanMethod beanMethod=new BeanMethod(); beanMethod.setBean(bean); beanMethod.setMethod(method); Mediator.beanMethodMap.put(key,beanMethod); try { //修改部分,增加注册中心实现 ServiceInfo serviceInfo = new ServiceInfo(); serviceInfo.setServiceAddress(this.serverAddress); serviceInfo.setServicePort(this.serverPort); serviceInfo.setServiceName(serviceName); registryService.register(serviceInfo); //修改部分,增加注册中心实现 }catch (Exception e){ log.error("register service {} faild",serviceName,e); } } } return bean; } }

RpcServerProperties修改RpcServerProperties,增加注册中心的配置
@Data @ConfigurationProperties(prefix = "gp.rpc") public class RpcServerProperties {private int servicePort; private byte registerType; private String registryAddress; }

RpcProviderAutoConfiguration增加注册中心的注入。
@Configuration @EnableConfigurationProperties(RpcServerProperties.class) public class RpcProviderAutoConfiguration {@Bean public SpringRpcProviderBean rpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException { //添加注册中心 IRegistryService registryService=RegistryFactory.createRegistryService(rpcServerProperties.getRegistryAddress(), RegistryType.findByCode(rpcServerProperties.getRegisterType())); return new SpringRpcProviderBean(rpcServerProperties.getServicePort(),registryService); } }

application.properties修改netty-rpc-provider中的application.properties。
gp.rpc.servicePort=20880 gp.rpc.registerType=0 gp.rpc.registryAddress=192.168.221.128:2181

修改客户端,增加服务发现【[年薪60W分水岭]基于Netty手写实现Apache Dubbo(带注册中心和注解)】客户端需要修改的地方较多,下面这些修改的代码,都是netty-rpc-protocol模块中的类。
RpcClientProperties增加注册中心类型和注册中心地址的选项
@Data public class RpcClientProperties {private String serviceAddress="192.168.1.102"; private int servicePort=20880; private byte registryType; private String registryAddress; }

修改NettyClient原本是静态地址,现在修改成了从注册中心获取地址
@Slf4j public class NettyClient { private final Bootstrap bootstrap; private final EventLoopGroup eventLoopGroup=new NioEventLoopGroup(); /* private String serviceAddress; private int servicePort; */ public NettyClient(){ log.info("begin init NettyClient"); bootstrap=new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NiosocketChannel.class) .handler(new RpcClientInitializer()); /* this.serviceAddress=serviceAddress; this.servicePort=servicePort; */ }public void sendRequest(RpcProtocol< RpcRequest> protocol, IRegistryService registryService) throws Exception { ServiceInfo serviceInfo=registryService.discovery(protocol.getContent().getClassName()); ChannelFuture future=bootstrap.connect(serviceInfo.getServiceAddress(),serviceInfo.getServicePort()).sync(); future.addListener(listener-> { if(future.isSuccess()){ log.info("connect rpc server {} success.",serviceInfo.getServiceAddress()); }else{ log.error("connect rpc server {} failed .",serviceInfo.getServiceAddress()); future.cause().printStackTrace(); eventLoopGroup.shutdownGracefully(); } }); log.info("begin transfer data"); future.channel().writeAndFlush(protocol); } }

修改RpcInvokerProxy将静态ip和地址,修改成IRegistryService
@Slf4j public class RpcInvokerProxy implements InvocationHandler {/* private String serviceAddress; private int servicePort; */IRegistryService registryService; public RpcInvokerProxy(IRegistryService registryService) { /* this.serviceAddress = serviceAddress; this.servicePort = servicePort; */ this.registryService=registryService; }@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { log.info("begin invoke target server"); //组装参数 RpcProtocol< RpcRequest> protocol=new RpcProtocol< > (); long requestId= RequestHolder.REQUEST_ID.incrementAndGet(); Header header=new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(), ReqType.REQUEST.code(),requestId,0); protocol.setHeader(header); RpcRequest request=new RpcRequest(); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParams(args); protocol.setContent(request); //发送请求 NettyClient nettyClient=new NettyClient(); //构建异步数据处理 RpcFuture< RpcResponse> future=new RpcFuture< > (new DefaultPromise< > (new DefaultEventLoop())); RequestHolder.REQUEST_MAP.put(requestId,future); nettyClient.sendRequest(protocol,this.registryService); return future.getPromise().get().getData(); } }

SpringRpcReferenceBean修改引用bean,增加注册中心配置
public class SpringRpcReferenceBean implements FactoryBean< Object> {private Class< ?> interfaceClass; private Object object; /* private String serviceAddress; private int servicePort; */ //修改增加注册中心 private byte registryType; private String registryAddress; @Override public Object getObject() throws Exception { return object; }public void init(){ //修改增加注册中心 IRegistryService registryService= RegistryFactory.createRegistryService(this.registryAddress, RegistryType.findByCode(this.registryType)); this.object= Proxy.newProxyInstance(this.interfaceClass.getClassLoader(), new Class< ?> []{this.interfaceClass}, new RpcInvokerProxy(registryService)); }@Override public Class< ?> getObjectType() { return this.interfaceClass; }public void setInterfaceClass(Class< ?> interfaceClass) { this.interfaceClass = interfaceClass; }/* public void setServiceAddress(String serviceAddress) { this.serviceAddress = serviceAddress; }public void setServicePort(int servicePort) { this.servicePort = servicePort; }*/public void setRegistryType(byte registryType) { this.registryType = registryType; }public void setRegistryAddress(String registryAddress) { this.registryAddress = registryAddress; } }

SpringRpcReferencePostProcessor
@Slf4j public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor { private ApplicationContext context; private ClassLoader classLoader; private RpcClientProperties clientProperties; public SpringRpcReferencePostProcessor(RpcClientProperties clientProperties) { this.clientProperties = clientProperties; }//保存发布的引用bean信息 private final Map< String, BeanDefinition> rpcRefBeanDefinitions=new ConcurrentHashMap< > (); @Override public void setBeanClassLoader(ClassLoader classLoader) { this.classLoader=classLoader; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.context=applicationContext; }@Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { for (String beanDefinitionname:beanFactory.getBeanDefinitionNames()){ //遍历bean定义,然后获取到加载的bean,遍历这些bean中的字段,是否携带GpRemoteReference注解 //如果有,则需要构建一个动态代理实现 BeanDefinition beanDefinition=beanFactory.getBeanDefinition(beanDefinitionname); String beanClassName=beanDefinition.getBeanClassName(); if(beanClassName!=null){ Class< ?> clazz=ClassUtils.resolveClassName(beanClassName,this.classLoader); ReflectionUtils.doWithFields(clazz,this::parseRpcReference); } } //将@GpRemoteReference注解的bean,构建一个动态代理对象 BeanDefinitionRegistry registry=(BeanDefinitionRegistry)beanFactory; this.rpcRefBeanDefinitions.forEach((beanName,beanDefinition)-> { if(context.containsBean(beanName)){ log.warn("SpringContext already register bean {}",beanName); return; } registry.registerBeanDefinition(beanName,beanDefinition); log.info("registered RpcReferenceBean {} success.",beanName); }); } private void parseRpcReference(Field field){ GpRemoteReference gpRemoteReference=AnnotationUtils.getAnnotation(field,GpRemoteReference.class); if(gpRemoteReference!=null) { BeanDefinitionBuilder builder=BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class); builder.setInitMethodName(RpcConstant.INIT_METHOD_NAME); builder.addPropertyValue("interfaceClass",field.getType()); /*builder.addPropertyValue("serviceAddress",clientProperties.getServiceAddress()); builder.addPropertyValue("servicePort",clientProperties.getServicePort()); */ builder.addPropertyValue("registryType",clientProperties.getRegistryType()); builder.addPropertyValue("registryAddress",clientProperties.getRegistryAddress()); BeanDefinition beanDefinition=builder.getBeanDefinition(); rpcRefBeanDefinitions.put(field.getName(),beanDefinition); } } }

RpcRefernceAutoConfiguration
@Configuration public class RpcRefernceAutoConfiguration implements EnvironmentAware{@Bean public SpringRpcReferencePostProcessor postProcessor(){ String address=environment.getProperty("gp.serviceAddress"); int port=Integer.parseInt(environment.getProperty("gp.servicePort")); RpcClientProperties rc=new RpcClientProperties(); rc.setServiceAddress(address); rc.setServicePort(port); rc.setRegistryType(Byte.parseByte(environment.getProperty("gp.registryType"))); rc.setRegistryAddress(environment.getProperty("gp.registryAddress")); return new SpringRpcReferencePostProcessor(rc); }private Environment environment; @Override public void setEnvironment(Environment environment) { this.environment=environment; } }

application.properties修改netty-rpc-consumer模块中的配置
gp.serviceAddress=192.168.1.102 gp.servicePort=20880gp.registryType=0 gp.registryAddress=192.168.221.128:2181

负载均衡的测试增加一个服务端的启动类,并且修改端口。然后客户端不需要重启的情况下刷新浏览器,即可看到负载均衡的效果。
[年薪60W分水岭]基于Netty手写实现Apache Dubbo(带注册中心和注解)

文章图片

< center> 图7-5< /center>
需要源码的同学,请关注公众号[跟着Mic学架构],回复关键字[rpc],即可获得

    推荐阅读