面试|阿里言(出乎意料,“字节跳动”居然是这么做数据迁移的)

目标 附面试思维导图:
面试|阿里言(出乎意料,“字节跳动”居然是这么做数据迁移的)
文章图片


  • 能够描述项目数据迁移的方案
  • 了解hbase的特点
  • 能够熟悉数据迁移中的数据包装和转换
  • 能够完成文章数据的全量和增量迁移
  • 能够完成热点文章数据的迁移
1 为什么需要自动同步 因为MySQL保存着我们爬取的以及自建的数据,对于爬取的数据,数据量比较大,使用mysql 存储会影响mysql的性能,并且我们需要对数据进行流式计算,对数据进行各种统计,mysq满足不了我们的需求,我们就将mysql中的全量数据同步到HBASE中,由HBASE保存海量数据,mysql中的全量数据会定期进行删除。
HBASE中保存着海量数据,我们需要计算出热点数据,并将数据同步到mysql以及MONGODB中,mysql中保存主体关系数据,MONGODB保存着具体数据信息。
因为热点数据也会失效,今天是热数据,明天就不是了,也需要定期对热点数据进行删除,我们定时删除一个月之前的热点数据,保持本月的热数据。
2 迁移方案 2.1 需求分析 2.1.1 功能需求 有了大量数据集基础后,实时计算后的热点数据需要保存起来,因为mysql保存大量文章数据会影响mysql的性能,所以采用mysql+mongoDB的方式进行存储。
2.1.1 全量数据迁移方案 通过定时任务将mysql中爬取或者自建的文章同步到HBASE中,并将同步过的数据状态改为已同步,下次同步的时候就不会再次同步这些数据了。
2.1.2 热数据迁移方案 HBASE中有全量数据,大数据端计算出热点数据,需要将这些热点数据同步到MYSQL和MONGDB中,用于页面显示
面试|阿里言(出乎意料,“字节跳动”居然是这么做数据迁移的)
文章图片


2.2 设计思路 将mysql数据库中的全量数据定时读取出来,将多个对象打包成一个对象,保存到HBASE中,保存成功后更新数据库中的状态改为已同步,下一次就不会同步该条数据了。
使用KAFKA监听热点数据计算结果,接收到热点数据信息后,从HBASE得到打包的数据,并将数据进行拆分,将关系数据保存到mysql中,将具体数据保存到mongodb中。
因为热点数据会失效,定期清除mysql和mongodb中的过期数据
2.3 数据同步注意的问题 HBASE数据主要靠rowKey进行查询的,rowKey设计就用mysql中的主键ID作为rowKey,查询的时候直接根据Rowkey获取数据
因为需要同步到HBASE的数据是多个数据表的数据,一条数据由多个对象组成,存储的时候使用列族区分不同的对象,里面存储不同的字段。
3 项目中集成hbase与Mongodb 在leadnews-common 集成:
  • hbase.properties
面试|阿里言(出乎意料,“字节跳动”居然是这么做数据迁移的)
文章图片


  • mongo.properties
面试|阿里言(出乎意料,“字节跳动”居然是这么做数据迁移的)
文章图片


  • pom.xml
org.springframework.boot spring-boot-starter-data-mongodb org.apache.hbase hbase-client 2.1.5 org.slf4j slf4j-api log4j log4j junit junit slf4j-log4j12 org.slf4j

host文件配置:
在服务器host文件中配置域名,根据自己的服务器地址更改
172.16.1.52 javaedge

4 常用组件介绍 4.1 Hbase相关操作 Hbase 操作工具类用于将数据存储到Hbase中,其中有些方法用于存储或删除。
4.1.1 项目导入 导入资料文件夹中的项目leadnews-migration
4.1.2 公共存储类 StorageData
公共存储数据表,由多个StorageEntity组成
StorageData 是最重要的一个存储对象,它是保存一个bean信息的类,负责存储bean信息以及转换和反向转换bean 。
该类用到一个重要的工具类ReflectUtils 反射工具类和DataConvertUtils数据类型转换工具类主要用于日期类型的转换
主要方法
  • 添加StorageEntry方法
public void addStorageEntry(StorageEntry entry)

该方法有几个重载方法,用于向StorageEntry列表中添加StorageEntry对象的
  • 获取该对象对应的Object对象
public Object getObjectValue()

该方法用于将存储的实体数据转换为Bean的实体,用了ReflectUtils反射工具类进行操作
  • 将Bean 转换为StorageData的存储结构
public static StorageData getStorageData(Object bean)

该方法用于将不同的bean转换为同一种存储结构进行存储
StorageEntity 公共代码存储的实体
StorageEntry 公共存储对象的一个key-value的字段
4.1.3 Hbase操作相关工具类 (1)HBaseConstants 类
配置类Hbase存储的的表名称
public class HBaseConstants { public static final String APARTICLE_QUANTITY_TABLE_NAME = "APARTICLE_QUANTITY_TABLE_NAME"; }

(2)HBaseInvok
hbase的的回调操作类 /** * Hbase 的回调类 * 用于我们操作的时候就行回调 */ public interface HBaseInvok { /** * 回调方法 */ public void invok(); }

(3)HBaseStorage
hbase 的存储对象 继承自StorageEntity
(4)HBaseClent
hbase client操作的工具类
(5)HBaseConfig
用于将HbaseClient对象的相关配置
(6)HBaseStorageClient
Hbase 存储客户端工具类 是对HbaseClient工具类的封装
这个类是自己封装的存储客户端
该类位于heima-leadnews-common 包下的
com.heima.hbase.HBaseStorageClient
其中用到了HBaseClent 客户端工具,它是一个操作工具类,不需要我们具体地写拿过来用就可以
4.1.4 测试代码
@SpringBootTest @RunWith(SpringRunner.class) public class HbaseTest {@Autowired private HBaseClent hBaseClent; @Test public void testCreateTable(){List columnFamily = new ArrayList<>(); columnFamily.add("test_cloumn_family1"); columnFamily.add("test_cloumn_family2"); boolean ret = hBaseClent.creatTable("hbase_test_table_name", columnFamily); }@Test public void testDelTable(){ hBaseClent.deleteTable("hbase_test_table_name"); }@Test public void testSaveData(){ String []columns ={"name","age"}; String [] values = {"zhangsan","28"}; hBaseClent.putData("hbase_test_table_name","test_row_key_001","test_cloumn_family1",columns,values); }@Test public void testFindByRowKey(){ Result hbaseResult = hBaseClent.getHbaseResult("hbase_test_table_name", "test_row_key_001"); System.out.println(hbaseResult); } }

4.2 MongoDB操作工具类 mongoDB是一个文档型数据库,也需要存储多个不同的对象,我们也用到了HBASE中用到的StorageEntity 存储结构,我们下面会讲
我们用到了Spring MongoTemplate 来操作数据库
介绍一下我们的实体
(1)MongoConstant
【面试|阿里言(出乎意料,“字节跳动”居然是这么做数据迁移的)】mongoDB操作的常量定义了操作mongodb的表名称
代码位置:
com.heima.common.mongo.constants.MongoConstant
public class MongoConstant { public static final String APARTICLE_MIGRATION_TABLE = "APARTICLE_MIGRATION_TABLE"; }

(2)MongoStorageEntity
MongoStorageEntity 是我们存储MongoDB数据的存储结构主要是基于StorageEntity 结构来的
mongoDB操作的实体类继承了StorageEntity 制定了 表明以及实体类型
代码位置:
com.heima.common.mongo.entity.MongoStorageEntity
(2)MongoStorageEntityMongoStorageEntity 是我们存储MongoDB数据的存储结构主要是基于StorageEntity 结构来的mongoDB操作的实体类继承了StorageEntity 制定了 表明以及实体类型代码位置:com.heima.common.mongo.entity.MongoStorageEntity

(3)MongoDBconfigure
对mongdb操作的配置类
代码位置:
com.heima.common.mongo.MongoDBconfigure
@Configuration @PropertySource("classpath:mongo.properties") public class MongoDBconfigure { @Value("${mongo.host}") private String host; @Value("${mongo.port}") private int port; @Value("${mongo.dbname}") private String dbName; @Bean public MongoTemplate getMongoTemplate() { return new MongoTemplate(getSimpleMongoDbFactory()); } public SimpleMongoDbFactory getSimpleMongoDbFactory() { return new SimpleMongoDbFactory(new MongoClient(host, port), dbName); } }

(4)测试代码
@SpringBootTest(classes = MigrationApplication.class) @RunWith(SpringJUnit4ClassRunner.class) public class MongoTest {@Autowired private MongoTemplate mongotemplate; @Autowired private HBaseStorageClient hBaseStorageClient; @Test public void test() { Class[] classes = new Class[]{ApArticle.class, ApArticleContent.class, ApAuthor.class}; //List entityList = hBaseStorageClient.getHbaseDataEntityList(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, "1", Arrays.asList(classes)); List strList = Arrays.asList(classes).stream().map(x -> x.getName()).collect(Collectors.toList()); List storageDataList = hBaseStorageClient.gethBaseClent().getStorageDataList(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, "1", strList); MongoStorageEntity mongoStorageEntity = new MongoStorageEntity(); mongoStorageEntity.setDataList(storageDataList); mongoStorageEntity.setRowKey("1"); MongoStorageEntity tmp = mongotemplate.findById("1", MongoStorageEntity.class); if (null != tmp) { mongotemplate.remove(tmp); } MongoStorageEntity tq = mongotemplate.insert(mongoStorageEntity); System.out.println(tq); }@Test public void test1() { MongoStorageEntity mongoStorageEntity = mongotemplate.findById("1", MongoStorageEntity.class); if (null != mongoStorageEntity && null != mongoStorageEntity.getDataList()) { mongoStorageEntity.getDataList().forEach(x -> { System.out.println(x.getObjectValue()); }); } } }
5 业务层代码 5.1 Habse操作实体类 (1)ArticleCallBack
Hbase相关回调操作的工具类
面试|阿里言(出乎意料,“字节跳动”居然是这么做数据迁移的)
文章图片


(2)ArticleHBaseInvok
Hbase 对回调对象的封装,以及对回调的invoke执行对象
面试|阿里言(出乎意料,“字节跳动”居然是这么做数据迁移的)
文章图片



(3)ArticleQuantity
对整个需要存储的对象的封装。
5.2 文章配置接口 5.2.1 mapper ApArticleConfigMapper中新增方法
面试|阿里言(出乎意料,“字节跳动”居然是这么做数据迁移的)
文章图片



ApArticleConfigMapper.xml
面试|阿里言(出乎意料,“字节跳动”居然是这么做数据迁移的)
文章图片


5.2.2 service 对文章配置操作的service
面试|阿里言(出乎意料,“字节跳动”居然是这么做数据迁移的)
文章图片




ApArticleConfigServiceImpl是对ApArticleConfig的操作
面试|阿里言(出乎意料,“字节跳动”居然是这么做数据迁移的)
文章图片


5.3 文章内容接口 5.3.1 mapper定义 ApArticleContentMapper新增方法
List selectByArticleIds(List articleIds);

5.3.2 service 对文章内容操作的Service
public interface ApArticleContenService { List queryByArticleIds(List ids); ApArticleContent getByArticleIds(Integer id); }

ApArticleContenServiceImpl
对ApArticleConten相关的操作
代码位置:
com.heima.migration.service.impl.ApArticleContenServiceImpl
@Service public class ApArticleContenServiceImpl implements ApArticleContenService {@Autowired private ApArticleContentMapper apArticleContentMapper; @Override public List queryByArticleIds(List ids) { return apArticleContentMapper.selectByArticleIds(ids); }@Override public ApArticleContent getByArticleIds(Integer id) { return apArticleContentMapper.selectByArticleId(id); } }

7.5 文章作者接口 7.5.1 mapper定义 ApAuthorMapper
List selectByIds(List ids);

ApAuthorMapper.xml
select * from ap_author where id in #{item}

7.5.2 service 对ApAuthor操作的Service
接口位置
:com.heima.migration.service.ApAuthorService
public interface ApAuthorService { List queryByIds(List ids); ApAuthor getById(Long id); }

ApAuthorServiceImpl
对ApAuthor相关的操作
代码位置
:com.heima.migration.service.impl.ApAuthorServiceImpl
@Service public class ApAuthorServiceImpl implements ApAuthorService { @Autowired private ApAuthorMapper apAuthorMapper; @Override public List queryByIds(List ids) { return apAuthorMapper.selectByIds(ids); } @Override public ApAuthor getById(Long id) { if (null != id) { return apAuthorMapper.selectById(id.intValue()); } return null; } }

7.6 综合迁移接口 ArticleQuantityService
操作ArticleQuantity对象的Service ArticleQuantity对象封装了文章相关的数据
接口位置:
com.heima.migration.service.ArticleQuantityService
public interface ArticleQuantityService { /** * 获取ArticleQuantity列表 * @return */ public List getArticleQuantityList(); /** * 根据ArticleId获取ArticleQuantity * @param id * @return */ public ArticleQuantity getArticleQuantityByArticleId(Long id); /** * 根据ByArticleId从Hbase中获取ArticleQuantity * @param id * @return */ public ArticleQuantity getArticleQuantityByArticleIdForHbase(Long id); /** * 数据库到Hbase的同步 */ public void dbToHbase(); /** * 根据articleId 将数据库的数据同步到Hbase * @param articleId */ public void dbToHbase(Integer articleId); }

ArticleQuantityServiceImpl
对ArticleQuantity的相关操作
代码位置:
com.heima.migration.service.impl.ArticleQuantityServiceImpl
/** * 查询未同步的数据,并封装成ArticleQuantity 对象 */ @Service @Log4j2 public class ArticleQuantityServiceImpl implements ArticleQuantityService {@Autowired private ApArticleContenService apArticleContenService; @Autowired private ApArticleConfigService apArticleConfigService; @Autowired private ApAuthorService apAuthorService; @Autowired private HBaseStorageClient hBaseStorageClient; @Autowired private ApArticleService apArticleService; /** * 查询位同步数据的列表 * * @return */ public List getArticleQuantityList() { log.info("生成ArticleQuantity列表"); //查询未同步的庶数据 List apArticleList = apArticleService.getUnsyncApArticleList(); if (apArticleList.isEmpty()) { return null; } //获取ArticleId 的list List apArticleIdList = apArticleList.stream().map(apArticle -> String.valueOf(apArticle.getId())).collect(Collectors.toList()); //获取AuthorId 的 list List apAuthorIdList = apArticleList.stream().map(apAuthor -> apAuthor.getAuthorId() == null ? null : apAuthor.getAuthorId().intValue()).filter(x -> x != null).collect(Collectors.toList()); //根据apArticleIdList 批量查询出内容列表 List apArticleContentList = apArticleContenService.queryByArticleIds(apArticleIdList); //根据apArticleIdList 批量查询出配置列表 List apArticleConfigList = apArticleConfigService.queryByArticleIds(apArticleIdList); //根据apAuthorIdList 批量查询出作者列 List apAuthorList = apAuthorService.queryByIds(apAuthorIdList); //将不同的对象转换为 ArticleQuantity 对象 List articleQuantityList = apArticleList.stream().map(apArticle -> { return new ArticleQuantity() {{ //设置apArticle 对象 setApArticle(apArticle); // 根据apArticle.getId() 过滤出符合要求的 ApArticleContent 对象 List apArticleContents = apArticleContentList.stream().filter(x -> x.getArticleId().equals(apArticle.getId())).collect(Collectors.toList()); if (null != apArticleContents && !apArticleContents.isEmpty()) { setApArticleContent(apArticleContents.get(0)); } // 根据 apArticle.getId 过滤出 ApArticleConfig 对象 List apArticleConfigs = apArticleConfigList.stream().filter(x -> x.getArticleId().equals(apArticle.getId())).collect(Collectors.toList()); if (null != apArticleConfigs && !apArticleConfigs.isEmpty()) { setApArticleConfig(apArticleConfigs.get(0)); } // 根据 apArticle.getAuthorId().intValue() 过滤出 ApAuthor 对象 List apAuthors = apAuthorList.stream().filter(x -> x.getId().equals(apArticle.getAuthorId().intValue())).collect(Collectors.toList()); if (null != apAuthors && !apAuthors.isEmpty()) { setApAuthor(apAuthors.get(0)); } //设置回调方法 用户方法的回调 用于修改同步状态 插入Hbase 成功后同步状态改为已同步 setHBaseInvok(new ArticleHBaseInvok(apArticle, (x) -> apArticleService.updateSyncStatus(x))); }}; }).collect(Collectors.toList()); if (null != articleQuantityList && !articleQuantityList.isEmpty()) { log.info("生成ArticleQuantity列表完成,size:{}", articleQuantityList.size()); } else { log.info("生成ArticleQuantity列表完成,size:{}", 0); }return articleQuantityList; }public ArticleQuantity getArticleQuantityByArticleId(Long id) { if (null == id) { return null; } ArticleQuantity articleQuantity = null; ApArticle apArticle = apArticleService.getById(id); if (null != apArticle) { articleQuantity = new ArticleQuantity(); articleQuantity.setApArticle(apArticle); ApArticleContent apArticleContent = apArticleContenService.getByArticleIds(id.intValue()); articleQuantity.setApArticleContent(apArticleContent); ApArticleConfig apArticleConfig = apArticleConfigService.getByArticleId(id.intValue()); articleQuantity.setApArticleConfig(apArticleConfig); ApAuthor apAuthor = apAuthorService.getById(apArticle.getAuthorId()); articleQuantity.setApAuthor(apAuthor); } return articleQuantity; }public ArticleQuantity getArticleQuantityByArticleIdForHbase(Long id) { if (null == id) { return null; } ArticleQuantity articleQuantity = null; List typeList = Arrays.asList(ApArticle.class, ApArticleContent.class, ApArticleConfig.class, ApAuthor.class); List objectList = hBaseStorageClient.getStorageDataEntityList(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, DataConvertUtils.toString(id), typeList); if (null != objectList && !objectList.isEmpty()) { articleQuantity = new ArticleQuantity(); for (Object value : objectList) { if (value instanceof ApArticle) { articleQuantity.setApArticle((ApArticle) value); } else if (value instanceof ApArticleContent) { articleQuantity.setApArticleContent((ApArticleContent) value); } else if (value instanceof ApArticleConfig) { articleQuantity.setApArticleConfig((ApArticleConfig) value); } else if (value instanceof ApAuthor) { articleQuantity.setApAuthor((ApAuthor) value); } } } return articleQuantity; }/** * 数据库到Hbase同步 */ public void dbToHbase() { long cutrrentTime = System.currentTimeMillis(); List articleQuantitList = getArticleQuantityList(); if (null != articleQuantitList && !articleQuantitList.isEmpty()) { log.info("开始进行定时数据库到HBASE同步,筛选出未同步数据量:{}", articleQuantitList.size()); if (null != articleQuantitList && !articleQuantitList.isEmpty()) { List hbaseStorageList = articleQuantitList.stream().map(ArticleQuantity::getHbaseStorage).collect(Collectors.toList()); hBaseStorageClient.addHBaseStorage(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, hbaseStorageList); } } else { log.info("定时数据库到HBASE同步为筛选出数据"); }log.info("定时数据库到HBASE同步结束,耗时:{}", System.currentTimeMillis() - cutrrentTime); }@Override public void dbToHbase(Integer articleId) { long cutrrentTime = System.currentTimeMillis(); log.info("开始进行异步数据库到HBASE同步,articleId:{}", articleId); if (null != articleId) { ArticleQuantity articleQuantity = getArticleQuantityByArticleId(articleId.longValue()); if (null != articleQuantity) { HBaseStorage hBaseStorage = articleQuantity.getHbaseStorage(); hBaseStorageClient.addHBaseStorage(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, hBaseStorage); } } log.info("异步数据库到HBASE同步结束,articleId:{},耗时:{}", articleId, System.currentTimeMillis() - cutrrentTime); } }
7.7 热点文章接口 ApHotArticleService
对ApHotArticle操作Service
接口位置:
com.heima.migration.service.ApHotArticleService
public interface ApHotArticleService { List selectList(ApHotArticles apHotArticlesQuery); void insert(ApHotArticles apHotArticles); /** * 热数据 Hbase 同步 * * @param apArticleId */ public void hotApArticleSync(Integer apArticleId); void deleteById(Integer id); /** * 查询过期的数据 * * @return */ public List selectExpireMonth(); void deleteHotData(ApHotArticles apHotArticle); }

ApHotArticleServiceImpl
对ApHotArticle的相关操作
代码位置:
com.heima.migration.service.impl.ApHotArticleServiceImpl
/** * 热点数据操作Service 类 */ @Service @Log4j2 public class ApHotArticleServiceImpl implements ApHotArticleService {@Autowired private ApHotArticlesMapper apHotArticlesMapper; @Autowired private MongoTemplate mongoTemplate; @Autowired private ArticleQuantityService articleQuantityService; @Autowired private HBaseStorageClient hBaseStorageClient; @Override public List selectList(ApHotArticles apHotArticlesQuery) { return apHotArticlesMapper.selectList(apHotArticlesQuery); }/** * 根据ID删除 * * @param id */ @Override public void deleteById(Integer id) { log.info("删除热数据,apArticleId:{}", id); apHotArticlesMapper.deleteById(id); }/** * 查询一个月之前的数据 * * @return */ @Override public List selectExpireMonth() { return apHotArticlesMapper.selectExpireMonth(); }/** * 删除过去的热数据 * * @param apHotArticle */ @Override public void deleteHotData(ApHotArticles apHotArticle) { deleteById(apHotArticle.getId()); String rowKey = DataConvertUtils.toString(apHotArticle.getId()); hBaseStorageClient.gethBaseClent().deleteRow(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, rowKey); MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKey, MongoStorageEntity.class); if (null != mongoStorageEntity) { mongoTemplate.remove(mongoStorageEntity); } }/** * 插入操作 * * @param apHotArticles */ @Override public void insert(ApHotArticles apHotArticles) { apHotArticlesMapper.insert(apHotArticles); }/** * 热点数据同步方法 * * @param apArticleId */ @Override public void hotApArticleSync(Integer apArticleId) { log.info("开始将热数据同步,apArticleId:{}", apArticleId); ArticleQuantity articleQuantity = getHotArticleQuantity(apArticleId); if (null != articleQuantity) { //热点数据同步到DB中 hotApArticleToDBSync(articleQuantity); //热点数据同步到MONGO hotApArticleMongoSync(articleQuantity); log.info("热数据同步完成,apArticleId:{}", apArticleId); } else { log.error("找不到对应的热数据,apArticleId:{}", apArticleId); } }/** * 获取热数据的ArticleQuantity 对象 * * @param apArticleId * @return */ private ArticleQuantity getHotArticleQuantity(Integer apArticleId) { Long id = Long.valueOf(apArticleId); ArticleQuantity articleQuantity = articleQuantityService.getArticleQuantityByArticleId(id); if (null == articleQuantity) { articleQuantity = articleQuantityService.getArticleQuantityByArticleIdForHbase(id); } return articleQuantity; }/** * 热数据 到数据库Mysql的同步 * * @param articleQuantity */ public void hotApArticleToDBSync(ArticleQuantity articleQuantity) { Integer apArticleId = articleQuantity.getApArticleId(); log.info("开始将热数据从Hbase同步到mysql,apArticleId:{}", apArticleId); if (null == apArticleId) { log.error("apArticleId不存在无法进行同步"); return; } ApHotArticles apHotArticlesQuery = new ApHotArticles() {{ setArticleId(apArticleId); }}; List apHotArticlesList = apHotArticlesMapper.selectList(apHotArticlesQuery); if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) { log.info("Mysql数据已同步过不需要再次同步,apArticleId:{}", apArticleId); } else { ApHotArticles apHotArticles = articleQuantity.getApHotArticles(); apHotArticlesMapper.insert(apHotArticles); } log.info("将热数据从Hbase同步到mysql完成,apArticleId:{}", apArticleId); }/** * 热数据向从Hbase到Mongodb同步 * * @param articleQuantity */ public void hotApArticleMongoSync(ArticleQuantity articleQuantity) { Integer apArticleId = articleQuantity.getApArticleId(); log.info("开始将热数据从Hbase同步到MongoDB,apArticleId:{}", apArticleId); if (null == apArticleId) { log.error("apArticleId不存在无法进行同步"); return; } String rowKeyId = DataConvertUtils.toString(apArticleId); MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKeyId, MongoStorageEntity.class); if (null != mongoStorageEntity) { log.info("MongoDB数据已同步过不需要再次同步,apArticleId:{}", apArticleId); } else { List storageDataList = articleQuantity.getStorageDataList(); if (null != storageDataList && !storageDataList.isEmpty()) { mongoStorageEntity = new MongoStorageEntity(); mongoStorageEntity.setDataList(storageDataList); mongoStorageEntity.setRowKey(rowKeyId); mongoTemplate.insert(mongoStorageEntity); }} log.info("将热数据从Hbase同步到MongoDB完成,apArticleId:{}", apArticleId); } }

8 定时同步数据 8.1 全量数据从mysql同步到HBase
@Component @DisallowConcurrentExecution @Log4j2 /** * 全量数据从mysql 同步到HBase */ public class MigrationDbToHBaseQuartz extends AbstractJob {@Autowired private ArticleQuantityService articleQuantityService; @Override public String[] triggerCron() { /** * 2019/8/9 10:15:00 * 2019/8/9 10:20:00 * 2019/8/9 10:25:00 * 2019/8/9 10:30:00 * 2019/8/9 10:35:00 */ return new String[]{"0 0/5 * * * ?"}; }@Override protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { log.info("开始进行数据库到HBASE同步任务"); articleQuantityService.dbToHbase(); log.info("数据库到HBASE同步任务完成"); }}

8.2 定期删除过期的数据
/** * 定期删除过期的数据 */ @Component @Log4j2 public class MigrationDeleteHotDataQuartz extends AbstractJob {@Autowired private ApHotArticleService apHotArticleService; @Override public String[] triggerCron() { /** * 2019/8/9 22:30:00 * 2019/8/10 22:30:00 * 2019/8/11 22:30:00 * 2019/8/12 22:30:00 * 2019/8/13 22:30:00 */ return new String[]{"0 30 22 * * ?"}; }@Override protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { long cutrrentTime = System.currentTimeMillis(); log.info("开始删除数据库过期数据"); deleteExpireHotData(); log.info("删除数据库过期数据结束,耗时:{}", System.currentTimeMillis() - cutrrentTime); }/** * 删除过期的热数据 */ public void deleteExpireHotData() { List apHotArticlesList = apHotArticleService.selectExpireMonth(); if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) { for (ApHotArticles apHotArticle : apHotArticlesList) { apHotArticleService.deleteHotData(apHotArticle); } } }}

9 消息接收同步数据 9.1 文章审核成功同步 9.1.1 消息发送 (1)消息名称定义及消息发送方法声明
maven_test.properties
kafka.topic.article-audit-success=kafka.topic.article.audit.success.sigle.test

kafka.properties
kafka.topic.article-audit-success=${kafka.topic.article-audit-success}


com.heima.common.kafka.KafkaTopicConfig新增属性
/** * 审核成功 */ String articleAuditSuccess;

com.heima.common.kafka.KafkaSender
/** * 发送审核成功消息 */ public void sendArticleAuditSuccessMessage(ArticleAuditSuccess message) { ArticleAuditSuccessMessage temp = new ArticleAuditSuccessMessage(); temp.setData(message); this.sendMesssage(kafkaTopicConfig.getArticleAuditSuccess(), UUID.randomUUID().toString(), temp); }

(2)修改自动审核代码,爬虫和自媒体都要修改
在审核成功后,发送消息
爬虫
//文章审核成功 ArticleAuditSuccess articleAuditSuccess = new ArticleAuditSuccess(); articleAuditSuccess.setArticleId(apArticle.getId()); articleAuditSuccess.setType(ArticleAuditSuccess.ArticleType.CRAWLER); articleAuditSuccess.setChannelId(apArticle.getChannelId()); kafkaSender.sendArticleAuditSuccessMessage(articleAuditSuccess);

自媒体
//文章审核成功 ArticleAuditSuccess articleAuditSuccess = new ArticleAuditSuccess(); articleAuditSuccess.setArticleId(apArticle.getId()); articleAuditSuccess.setType(ArticleAuditSuccess.ArticleType.MEDIA); articleAuditSuccess.setChannelId(apArticle.getChannelId()); kafkaSender.sendArticleAuditSuccessMessage(articleAuditSuccess);

9.1.2消息接收
/** * 热点文章监听类 */ @Component @Log4j2 public class MigrationAuditSucessArticleListener implements KafkaListener { /** * 通用转换mapper */ @Autowired ObjectMapper mapper; /** * kafka 主题 配置 */ @Autowired KafkaTopicConfig kafkaTopicConfig; @Autowired private ArticleQuantityService articleQuantityService; @Override public String topic() { return kafkaTopicConfig.getArticleAuditSuccess(); }/** * 监听消息 * * @param data * @param consumer */ @Override public void onMessage(ConsumerRecord data, Consumer consumer) { log.info("kafka接收到审核通过消息:{}", data); String value = https://www.it610.com/article/(String) data.value(); if (null != value) { ArticleAuditSuccessMessage message = null; try { message = mapper.readValue(value, ArticleAuditSuccessMessage.class); } catch (IOException e) { e.printStackTrace(); } ArticleAuditSuccess auto = message.getData(); if (null != auto) { //调用方法 将HBAESE中的热数据进行同步 Integer articleId = auto.getArticleId(); if (null != articleId) { articleQuantityService.dbToHbase(articleId); } } }} }

9.2 热点文章同步 创建监听类:
com.heima.migration.kafka.listener.MigrationHotArticleListener
/** * 热点文章监听类 */ @Component @Log4j2 public class MigrationHotArticleListener implements KafkaListener { /** * 通用转换mapper */ @Autowired ObjectMapper mapper; /** * kafka 主题 配置 */ @Autowired KafkaTopicConfig kafkaTopicConfig; /** * 热点文章service注入 */ @Autowired private ApHotArticleService apHotArticleService; @Override public String topic() { return kafkaTopicConfig.getHotArticle(); }/** * 监听消息 * * @param data * @param consumer */ @Override public void onMessage(ConsumerRecord data, Consumer consumer) { log.info("kafka接收到热数据同步消息:{}", data); String value = https://www.it610.com/article/(String) data.value(); if (null != value) { ApHotArticleMessage message = null; try { message = mapper.readValue(value, ApHotArticleMessage.class); } catch (IOException e) { e.printStackTrace(); } Integer articleId = message.getData().getArticleId(); if (null != articleId) { //调用方法 将HBAESE中的热数据进行同步 apHotArticleService.hotApArticleSync(articleId); } } } }

    推荐阅读