目标 附面试思维导图:
文章图片
- 能够描述项目数据迁移的方案
- 了解hbase的特点
- 能够熟悉数据迁移中的数据包装和转换
- 能够完成文章数据的全量和增量迁移
- 能够完成热点文章数据的迁移
HBASE中保存着海量数据,我们需要计算出热点数据,并将数据同步到mysql以及MONGODB中,mysql中保存主体关系数据,MONGODB保存着具体数据信息。
因为热点数据也会失效,今天是热数据,明天就不是了,也需要定期对热点数据进行删除,我们定时删除一个月之前的热点数据,保持本月的热数据。
2 迁移方案 2.1 需求分析 2.1.1 功能需求 有了大量数据集基础后,实时计算后的热点数据需要保存起来,因为mysql保存大量文章数据会影响mysql的性能,所以采用mysql+mongoDB的方式进行存储。
2.1.1 全量数据迁移方案 通过定时任务将mysql中爬取或者自建的文章同步到HBASE中,并将同步过的数据状态改为已同步,下次同步的时候就不会再次同步这些数据了。
2.1.2 热数据迁移方案 HBASE中有全量数据,大数据端计算出热点数据,需要将这些热点数据同步到MYSQL和MONGDB中,用于页面显示
文章图片
2.2 设计思路 将mysql数据库中的全量数据定时读取出来,将多个对象打包成一个对象,保存到HBASE中,保存成功后更新数据库中的状态改为已同步,下一次就不会同步该条数据了。
使用KAFKA监听热点数据计算结果,接收到热点数据信息后,从HBASE得到打包的数据,并将数据进行拆分,将关系数据保存到mysql中,将具体数据保存到mongodb中。
因为热点数据会失效,定期清除mysql和mongodb中的过期数据
2.3 数据同步注意的问题 HBASE数据主要靠rowKey进行查询的,rowKey设计就用mysql中的主键ID作为rowKey,查询的时候直接根据Rowkey获取数据
因为需要同步到HBASE的数据是多个数据表的数据,一条数据由多个对象组成,存储的时候使用列族区分不同的对象,里面存储不同的字段。
3 项目中集成hbase与Mongodb 在leadnews-common 集成:
- hbase.properties
文章图片
- mongo.properties
文章图片
- pom.xml
org.springframework.boot
spring-boot-starter-data-mongodb
org.apache.hbase
hbase-client
2.1.5
org.slf4j
slf4j-api
log4j
log4j
junit
junit
slf4j-log4j12
org.slf4j
host文件配置:
在服务器host文件中配置域名,根据自己的服务器地址更改
172.16.1.52 javaedge
4 常用组件介绍 4.1 Hbase相关操作 Hbase 操作工具类用于将数据存储到Hbase中,其中有些方法用于存储或删除。
4.1.1 项目导入 导入资料文件夹中的项目leadnews-migration
4.1.2 公共存储类 StorageData
公共存储数据表,由多个StorageEntity组成
StorageData 是最重要的一个存储对象,它是保存一个bean信息的类,负责存储bean信息以及转换和反向转换bean 。
该类用到一个重要的工具类ReflectUtils 反射工具类和DataConvertUtils数据类型转换工具类主要用于日期类型的转换
主要方法
- 添加StorageEntry方法
public void addStorageEntry(StorageEntry entry)
该方法有几个重载方法,用于向StorageEntry列表中添加StorageEntry对象的
- 获取该对象对应的Object对象
public Object getObjectValue()
该方法用于将存储的实体数据转换为Bean的实体,用了ReflectUtils反射工具类进行操作
- 将Bean 转换为StorageData的存储结构
public static StorageData getStorageData(Object bean)
该方法用于将不同的bean转换为同一种存储结构进行存储
StorageEntity 公共代码存储的实体
StorageEntry 公共存储对象的一个key-value的字段
4.1.3 Hbase操作相关工具类 (1)HBaseConstants 类
配置类Hbase存储的的表名称
public class HBaseConstants {
public static final String APARTICLE_QUANTITY_TABLE_NAME = "APARTICLE_QUANTITY_TABLE_NAME";
}
(2)HBaseInvok
hbase的的回调操作类
/**
* Hbase 的回调类
* 用于我们操作的时候就行回调
*/
public interface HBaseInvok {
/**
* 回调方法
*/
public void invok();
}
(3)HBaseStorage
hbase 的存储对象 继承自StorageEntity
(4)HBaseClent
hbase client操作的工具类
(5)HBaseConfig
用于将HbaseClient对象的相关配置
(6)HBaseStorageClient
Hbase 存储客户端工具类 是对HbaseClient工具类的封装
这个类是自己封装的存储客户端
该类位于heima-leadnews-common 包下的
com.heima.hbase.HBaseStorageClient
其中用到了HBaseClent 客户端工具,它是一个操作工具类,不需要我们具体地写拿过来用就可以
4.1.4 测试代码
@SpringBootTest
@RunWith(SpringRunner.class)
public class HbaseTest {@Autowired
private HBaseClent hBaseClent;
@Test
public void testCreateTable(){List columnFamily = new ArrayList<>();
columnFamily.add("test_cloumn_family1");
columnFamily.add("test_cloumn_family2");
boolean ret = hBaseClent.creatTable("hbase_test_table_name", columnFamily);
}@Test
public void testDelTable(){
hBaseClent.deleteTable("hbase_test_table_name");
}@Test
public void testSaveData(){
String []columns ={"name","age"};
String [] values = {"zhangsan","28"};
hBaseClent.putData("hbase_test_table_name","test_row_key_001","test_cloumn_family1",columns,values);
}@Test
public void testFindByRowKey(){
Result hbaseResult = hBaseClent.getHbaseResult("hbase_test_table_name", "test_row_key_001");
System.out.println(hbaseResult);
}
}
4.2 MongoDB操作工具类 mongoDB是一个文档型数据库,也需要存储多个不同的对象,我们也用到了HBASE中用到的StorageEntity 存储结构,我们下面会讲
我们用到了Spring MongoTemplate 来操作数据库
介绍一下我们的实体
(1)MongoConstant
【面试|阿里言(出乎意料,“字节跳动”居然是这么做数据迁移的)】mongoDB操作的常量定义了操作mongodb的表名称
代码位置:
com.heima.common.mongo.constants.MongoConstant
public class MongoConstant {
public static final String APARTICLE_MIGRATION_TABLE = "APARTICLE_MIGRATION_TABLE";
}
(2)MongoStorageEntity
MongoStorageEntity 是我们存储MongoDB数据的存储结构主要是基于StorageEntity 结构来的
mongoDB操作的实体类继承了StorageEntity 制定了 表明以及实体类型
代码位置:
com.heima.common.mongo.entity.MongoStorageEntity
(2)MongoStorageEntityMongoStorageEntity 是我们存储MongoDB数据的存储结构主要是基于StorageEntity 结构来的mongoDB操作的实体类继承了StorageEntity 制定了 表明以及实体类型代码位置:com.heima.common.mongo.entity.MongoStorageEntity
(3)MongoDBconfigure
对mongdb操作的配置类
代码位置:
com.heima.common.mongo.MongoDBconfigure
@Configuration
@PropertySource("classpath:mongo.properties")
public class MongoDBconfigure {
@Value("${mongo.host}")
private String host;
@Value("${mongo.port}")
private int port;
@Value("${mongo.dbname}")
private String dbName;
@Bean
public MongoTemplate getMongoTemplate() {
return new MongoTemplate(getSimpleMongoDbFactory());
}
public SimpleMongoDbFactory getSimpleMongoDbFactory() {
return new SimpleMongoDbFactory(new MongoClient(host, port), dbName);
}
}
(4)测试代码
@SpringBootTest(classes = MigrationApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class MongoTest {@Autowired
private MongoTemplate mongotemplate;
@Autowired
private HBaseStorageClient hBaseStorageClient;
@Test
public void test() {
Class>[] classes = new Class>[]{ApArticle.class, ApArticleContent.class, ApAuthor.class};
//List
5 业务层代码 5.1 Habse操作实体类 (1)ArticleCallBack
Hbase相关回调操作的工具类
文章图片
(2)ArticleHBaseInvok
Hbase 对回调对象的封装,以及对回调的invoke执行对象
文章图片
(3)ArticleQuantity
对整个需要存储的对象的封装。
5.2 文章配置接口 5.2.1 mapper ApArticleConfigMapper中新增方法
文章图片
ApArticleConfigMapper.xml
文章图片
5.2.2 service 对文章配置操作的service
文章图片
ApArticleConfigServiceImpl是对ApArticleConfig的操作
文章图片
5.3 文章内容接口 5.3.1 mapper定义 ApArticleContentMapper新增方法
List selectByArticleIds(List articleIds);
5.3.2 service 对文章内容操作的Service
public interface ApArticleContenService {
List queryByArticleIds(List ids);
ApArticleContent getByArticleIds(Integer id);
}
ApArticleContenServiceImpl
对ApArticleConten相关的操作
代码位置:
com.heima.migration.service.impl.ApArticleContenServiceImpl
@Service
public class ApArticleContenServiceImpl implements ApArticleContenService {@Autowired
private ApArticleContentMapper apArticleContentMapper;
@Override
public List queryByArticleIds(List ids) {
return apArticleContentMapper.selectByArticleIds(ids);
}@Override
public ApArticleContent getByArticleIds(Integer id) {
return apArticleContentMapper.selectByArticleId(id);
}
}
7.5 文章作者接口 7.5.1 mapper定义 ApAuthorMapper
List selectByIds(List ids);
ApAuthorMapper.xml
select * from ap_author
where id in
#{item}
7.5.2 service 对ApAuthor操作的Service
接口位置
:com.heima.migration.service.ApAuthorService
public interface ApAuthorService {
List queryByIds(List ids);
ApAuthor getById(Long id);
}
ApAuthorServiceImpl
对ApAuthor相关的操作
代码位置
:com.heima.migration.service.impl.ApAuthorServiceImpl
@Service
public class ApAuthorServiceImpl implements ApAuthorService {
@Autowired
private ApAuthorMapper apAuthorMapper;
@Override
public List queryByIds(List ids) {
return apAuthorMapper.selectByIds(ids);
}
@Override
public ApAuthor getById(Long id) {
if (null != id) {
return apAuthorMapper.selectById(id.intValue());
}
return null;
}
}
7.6 综合迁移接口 ArticleQuantityService
操作ArticleQuantity对象的Service ArticleQuantity对象封装了文章相关的数据
接口位置:
com.heima.migration.service.ArticleQuantityService
public interface ArticleQuantityService {
/**
* 获取ArticleQuantity列表
* @return
*/
public List getArticleQuantityList();
/**
* 根据ArticleId获取ArticleQuantity
* @param id
* @return
*/
public ArticleQuantity getArticleQuantityByArticleId(Long id);
/**
* 根据ByArticleId从Hbase中获取ArticleQuantity
* @param id
* @return
*/
public ArticleQuantity getArticleQuantityByArticleIdForHbase(Long id);
/**
* 数据库到Hbase的同步
*/
public void dbToHbase();
/**
* 根据articleId 将数据库的数据同步到Hbase
* @param articleId
*/
public void dbToHbase(Integer articleId);
}
ArticleQuantityServiceImpl
对ArticleQuantity的相关操作
代码位置:
com.heima.migration.service.impl.ArticleQuantityServiceImpl
/**
* 查询未同步的数据,并封装成ArticleQuantity 对象
*/
@Service
@Log4j2
public class ArticleQuantityServiceImpl implements ArticleQuantityService {@Autowired
private ApArticleContenService apArticleContenService;
@Autowired
private ApArticleConfigService apArticleConfigService;
@Autowired
private ApAuthorService apAuthorService;
@Autowired
private HBaseStorageClient hBaseStorageClient;
@Autowired
private ApArticleService apArticleService;
/**
* 查询位同步数据的列表
*
* @return
*/
public List getArticleQuantityList() {
log.info("生成ArticleQuantity列表");
//查询未同步的庶数据
List apArticleList = apArticleService.getUnsyncApArticleList();
if (apArticleList.isEmpty()) {
return null;
}
//获取ArticleId 的list
List apArticleIdList = apArticleList.stream().map(apArticle -> String.valueOf(apArticle.getId())).collect(Collectors.toList());
//获取AuthorId 的 list
List apAuthorIdList = apArticleList.stream().map(apAuthor -> apAuthor.getAuthorId() == null ? null : apAuthor.getAuthorId().intValue()).filter(x -> x != null).collect(Collectors.toList());
//根据apArticleIdList 批量查询出内容列表
List apArticleContentList = apArticleContenService.queryByArticleIds(apArticleIdList);
//根据apArticleIdList 批量查询出配置列表
List apArticleConfigList = apArticleConfigService.queryByArticleIds(apArticleIdList);
//根据apAuthorIdList 批量查询出作者列
List apAuthorList = apAuthorService.queryByIds(apAuthorIdList);
//将不同的对象转换为 ArticleQuantity 对象
List articleQuantityList = apArticleList.stream().map(apArticle -> {
return new ArticleQuantity() {{
//设置apArticle 对象
setApArticle(apArticle);
// 根据apArticle.getId() 过滤出符合要求的 ApArticleContent 对象
List apArticleContents = apArticleContentList.stream().filter(x -> x.getArticleId().equals(apArticle.getId())).collect(Collectors.toList());
if (null != apArticleContents && !apArticleContents.isEmpty()) {
setApArticleContent(apArticleContents.get(0));
}
// 根据 apArticle.getId 过滤出 ApArticleConfig 对象
List apArticleConfigs = apArticleConfigList.stream().filter(x -> x.getArticleId().equals(apArticle.getId())).collect(Collectors.toList());
if (null != apArticleConfigs && !apArticleConfigs.isEmpty()) {
setApArticleConfig(apArticleConfigs.get(0));
}
// 根据 apArticle.getAuthorId().intValue() 过滤出 ApAuthor 对象
List apAuthors = apAuthorList.stream().filter(x -> x.getId().equals(apArticle.getAuthorId().intValue())).collect(Collectors.toList());
if (null != apAuthors && !apAuthors.isEmpty()) {
setApAuthor(apAuthors.get(0));
}
//设置回调方法 用户方法的回调 用于修改同步状态 插入Hbase 成功后同步状态改为已同步
setHBaseInvok(new ArticleHBaseInvok(apArticle, (x) -> apArticleService.updateSyncStatus(x)));
}};
}).collect(Collectors.toList());
if (null != articleQuantityList && !articleQuantityList.isEmpty()) {
log.info("生成ArticleQuantity列表完成,size:{}", articleQuantityList.size());
} else {
log.info("生成ArticleQuantity列表完成,size:{}", 0);
}return articleQuantityList;
}public ArticleQuantity getArticleQuantityByArticleId(Long id) {
if (null == id) {
return null;
}
ArticleQuantity articleQuantity = null;
ApArticle apArticle = apArticleService.getById(id);
if (null != apArticle) {
articleQuantity = new ArticleQuantity();
articleQuantity.setApArticle(apArticle);
ApArticleContent apArticleContent = apArticleContenService.getByArticleIds(id.intValue());
articleQuantity.setApArticleContent(apArticleContent);
ApArticleConfig apArticleConfig = apArticleConfigService.getByArticleId(id.intValue());
articleQuantity.setApArticleConfig(apArticleConfig);
ApAuthor apAuthor = apAuthorService.getById(apArticle.getAuthorId());
articleQuantity.setApAuthor(apAuthor);
}
return articleQuantity;
}public ArticleQuantity getArticleQuantityByArticleIdForHbase(Long id) {
if (null == id) {
return null;
}
ArticleQuantity articleQuantity = null;
List typeList = Arrays.asList(ApArticle.class, ApArticleContent.class, ApArticleConfig.class, ApAuthor.class);
List
7.7 热点文章接口 ApHotArticleService
对ApHotArticle操作Service
接口位置:
com.heima.migration.service.ApHotArticleService
public interface ApHotArticleService {
List selectList(ApHotArticles apHotArticlesQuery);
void insert(ApHotArticles apHotArticles);
/**
* 热数据 Hbase 同步
*
* @param apArticleId
*/
public void hotApArticleSync(Integer apArticleId);
void deleteById(Integer id);
/**
* 查询过期的数据
*
* @return
*/
public List selectExpireMonth();
void deleteHotData(ApHotArticles apHotArticle);
}
ApHotArticleServiceImpl
对ApHotArticle的相关操作
代码位置:
com.heima.migration.service.impl.ApHotArticleServiceImpl
/**
* 热点数据操作Service 类
*/
@Service
@Log4j2
public class ApHotArticleServiceImpl implements ApHotArticleService {@Autowired
private ApHotArticlesMapper apHotArticlesMapper;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private ArticleQuantityService articleQuantityService;
@Autowired
private HBaseStorageClient hBaseStorageClient;
@Override
public List selectList(ApHotArticles apHotArticlesQuery) {
return apHotArticlesMapper.selectList(apHotArticlesQuery);
}/**
* 根据ID删除
*
* @param id
*/
@Override
public void deleteById(Integer id) {
log.info("删除热数据,apArticleId:{}", id);
apHotArticlesMapper.deleteById(id);
}/**
* 查询一个月之前的数据
*
* @return
*/
@Override
public List selectExpireMonth() {
return apHotArticlesMapper.selectExpireMonth();
}/**
* 删除过去的热数据
*
* @param apHotArticle
*/
@Override
public void deleteHotData(ApHotArticles apHotArticle) {
deleteById(apHotArticle.getId());
String rowKey = DataConvertUtils.toString(apHotArticle.getId());
hBaseStorageClient.gethBaseClent().deleteRow(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, rowKey);
MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKey, MongoStorageEntity.class);
if (null != mongoStorageEntity) {
mongoTemplate.remove(mongoStorageEntity);
}
}/**
* 插入操作
*
* @param apHotArticles
*/
@Override
public void insert(ApHotArticles apHotArticles) {
apHotArticlesMapper.insert(apHotArticles);
}/**
* 热点数据同步方法
*
* @param apArticleId
*/
@Override
public void hotApArticleSync(Integer apArticleId) {
log.info("开始将热数据同步,apArticleId:{}", apArticleId);
ArticleQuantity articleQuantity = getHotArticleQuantity(apArticleId);
if (null != articleQuantity) {
//热点数据同步到DB中
hotApArticleToDBSync(articleQuantity);
//热点数据同步到MONGO
hotApArticleMongoSync(articleQuantity);
log.info("热数据同步完成,apArticleId:{}", apArticleId);
} else {
log.error("找不到对应的热数据,apArticleId:{}", apArticleId);
}
}/**
* 获取热数据的ArticleQuantity 对象
*
* @param apArticleId
* @return
*/
private ArticleQuantity getHotArticleQuantity(Integer apArticleId) {
Long id = Long.valueOf(apArticleId);
ArticleQuantity articleQuantity = articleQuantityService.getArticleQuantityByArticleId(id);
if (null == articleQuantity) {
articleQuantity = articleQuantityService.getArticleQuantityByArticleIdForHbase(id);
}
return articleQuantity;
}/**
* 热数据 到数据库Mysql的同步
*
* @param articleQuantity
*/
public void hotApArticleToDBSync(ArticleQuantity articleQuantity) {
Integer apArticleId = articleQuantity.getApArticleId();
log.info("开始将热数据从Hbase同步到mysql,apArticleId:{}", apArticleId);
if (null == apArticleId) {
log.error("apArticleId不存在无法进行同步");
return;
}
ApHotArticles apHotArticlesQuery = new ApHotArticles() {{
setArticleId(apArticleId);
}};
List apHotArticlesList = apHotArticlesMapper.selectList(apHotArticlesQuery);
if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) {
log.info("Mysql数据已同步过不需要再次同步,apArticleId:{}", apArticleId);
} else {
ApHotArticles apHotArticles = articleQuantity.getApHotArticles();
apHotArticlesMapper.insert(apHotArticles);
}
log.info("将热数据从Hbase同步到mysql完成,apArticleId:{}", apArticleId);
}/**
* 热数据向从Hbase到Mongodb同步
*
* @param articleQuantity
*/
public void hotApArticleMongoSync(ArticleQuantity articleQuantity) {
Integer apArticleId = articleQuantity.getApArticleId();
log.info("开始将热数据从Hbase同步到MongoDB,apArticleId:{}", apArticleId);
if (null == apArticleId) {
log.error("apArticleId不存在无法进行同步");
return;
}
String rowKeyId = DataConvertUtils.toString(apArticleId);
MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKeyId, MongoStorageEntity.class);
if (null != mongoStorageEntity) {
log.info("MongoDB数据已同步过不需要再次同步,apArticleId:{}", apArticleId);
} else {
List storageDataList = articleQuantity.getStorageDataList();
if (null != storageDataList && !storageDataList.isEmpty()) {
mongoStorageEntity = new MongoStorageEntity();
mongoStorageEntity.setDataList(storageDataList);
mongoStorageEntity.setRowKey(rowKeyId);
mongoTemplate.insert(mongoStorageEntity);
}}
log.info("将热数据从Hbase同步到MongoDB完成,apArticleId:{}", apArticleId);
}
}
8 定时同步数据 8.1 全量数据从mysql同步到HBase
@Component
@DisallowConcurrentExecution
@Log4j2
/**
* 全量数据从mysql 同步到HBase
*/
public class MigrationDbToHBaseQuartz extends AbstractJob {@Autowired
private ArticleQuantityService articleQuantityService;
@Override
public String[] triggerCron() {
/**
* 2019/8/9 10:15:00
* 2019/8/9 10:20:00
* 2019/8/9 10:25:00
* 2019/8/9 10:30:00
* 2019/8/9 10:35:00
*/
return new String[]{"0 0/5 * * * ?"};
}@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
log.info("开始进行数据库到HBASE同步任务");
articleQuantityService.dbToHbase();
log.info("数据库到HBASE同步任务完成");
}}
8.2 定期删除过期的数据
/**
* 定期删除过期的数据
*/
@Component
@Log4j2
public class MigrationDeleteHotDataQuartz extends AbstractJob {@Autowired
private ApHotArticleService apHotArticleService;
@Override
public String[] triggerCron() {
/**
* 2019/8/9 22:30:00
* 2019/8/10 22:30:00
* 2019/8/11 22:30:00
* 2019/8/12 22:30:00
* 2019/8/13 22:30:00
*/
return new String[]{"0 30 22 * * ?"};
}@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
long cutrrentTime = System.currentTimeMillis();
log.info("开始删除数据库过期数据");
deleteExpireHotData();
log.info("删除数据库过期数据结束,耗时:{}", System.currentTimeMillis() - cutrrentTime);
}/**
* 删除过期的热数据
*/
public void deleteExpireHotData() {
List apHotArticlesList = apHotArticleService.selectExpireMonth();
if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) {
for (ApHotArticles apHotArticle : apHotArticlesList) {
apHotArticleService.deleteHotData(apHotArticle);
}
}
}}
9 消息接收同步数据 9.1 文章审核成功同步 9.1.1 消息发送 (1)消息名称定义及消息发送方法声明
maven_test.properties
kafka.topic.article-audit-success=kafka.topic.article.audit.success.sigle.test
kafka.properties
kafka.topic.article-audit-success=${kafka.topic.article-audit-success}
com.heima.common.kafka.KafkaTopicConfig新增属性
/**
* 审核成功
*/
String articleAuditSuccess;
com.heima.common.kafka.KafkaSender
/**
* 发送审核成功消息
*/
public void sendArticleAuditSuccessMessage(ArticleAuditSuccess message) {
ArticleAuditSuccessMessage temp = new ArticleAuditSuccessMessage();
temp.setData(message);
this.sendMesssage(kafkaTopicConfig.getArticleAuditSuccess(), UUID.randomUUID().toString(), temp);
}
(2)修改自动审核代码,爬虫和自媒体都要修改
在审核成功后,发送消息
爬虫
//文章审核成功
ArticleAuditSuccess articleAuditSuccess = new ArticleAuditSuccess();
articleAuditSuccess.setArticleId(apArticle.getId());
articleAuditSuccess.setType(ArticleAuditSuccess.ArticleType.CRAWLER);
articleAuditSuccess.setChannelId(apArticle.getChannelId());
kafkaSender.sendArticleAuditSuccessMessage(articleAuditSuccess);
自媒体
//文章审核成功
ArticleAuditSuccess articleAuditSuccess = new ArticleAuditSuccess();
articleAuditSuccess.setArticleId(apArticle.getId());
articleAuditSuccess.setType(ArticleAuditSuccess.ArticleType.MEDIA);
articleAuditSuccess.setChannelId(apArticle.getChannelId());
kafkaSender.sendArticleAuditSuccessMessage(articleAuditSuccess);
9.1.2消息接收
/**
* 热点文章监听类
*/
@Component
@Log4j2
public class MigrationAuditSucessArticleListener implements KafkaListener {
/**
* 通用转换mapper
*/
@Autowired
ObjectMapper mapper;
/**
* kafka 主题 配置
*/
@Autowired
KafkaTopicConfig kafkaTopicConfig;
@Autowired
private ArticleQuantityService articleQuantityService;
@Override
public String topic() {
return kafkaTopicConfig.getArticleAuditSuccess();
}/**
* 监听消息
*
* @param data
* @param consumer
*/
@Override
public void onMessage(ConsumerRecord data, Consumer, ?> consumer) {
log.info("kafka接收到审核通过消息:{}", data);
String value = https://www.it610.com/article/(String) data.value();
if (null != value) {
ArticleAuditSuccessMessage message = null;
try {
message = mapper.readValue(value, ArticleAuditSuccessMessage.class);
} catch (IOException e) {
e.printStackTrace();
}
ArticleAuditSuccess auto = message.getData();
if (null != auto) {
//调用方法 将HBAESE中的热数据进行同步
Integer articleId = auto.getArticleId();
if (null != articleId) {
articleQuantityService.dbToHbase(articleId);
}
}
}}
}
9.2 热点文章同步 创建监听类:
com.heima.migration.kafka.listener.MigrationHotArticleListener
/**
* 热点文章监听类
*/
@Component
@Log4j2
public class MigrationHotArticleListener implements KafkaListener {
/**
* 通用转换mapper
*/
@Autowired
ObjectMapper mapper;
/**
* kafka 主题 配置
*/
@Autowired
KafkaTopicConfig kafkaTopicConfig;
/**
* 热点文章service注入
*/
@Autowired
private ApHotArticleService apHotArticleService;
@Override
public String topic() {
return kafkaTopicConfig.getHotArticle();
}/**
* 监听消息
*
* @param data
* @param consumer
*/
@Override
public void onMessage(ConsumerRecord data, Consumer, ?> consumer) {
log.info("kafka接收到热数据同步消息:{}", data);
String value = https://www.it610.com/article/(String) data.value();
if (null != value) {
ApHotArticleMessage message = null;
try {
message = mapper.readValue(value, ApHotArticleMessage.class);
} catch (IOException e) {
e.printStackTrace();
}
Integer articleId = message.getData().getArticleId();
if (null != articleId) {
//调用方法 将HBAESE中的热数据进行同步
apHotArticleService.hotApArticleSync(articleId);
}
}
}
}
推荐阅读
- java|阿里力荐(这本Java性能调优实战,MySQL+JVM+Tomcat问题迎刃而解)
- java|纯 Java 撸个后台管理系统,这框架用起来贼好
- java|阿里大数据面试题集合(Hadoop+HBase+Spark+Zookeeper)
- 面试|PHP 基础知识
- Java|4000 字详解TCP超时与重传,看完没收获算我输
- spring|SpringCloud微服务全家桶
- 图分析的22种算法与图形理解
- 数据结构|数据结构 - 二叉树,二叉查找树,平衡二叉树,红黑树
- C++|C++ - Lambda表达式