聊聊chronos的pullFromDefaultCFAndPush

序 本文主要研究一下chronos的pullFromDefaultCFAndPush
pullFromDefaultCFAndPush DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPushService.java

public class MqPushService {//......public void pullFromDefaultCFAndPush() { final long seekTimestamp = MetaService.getSeekTimestamp(); final long zkSeekTimestamp = MetaService.getZkSeekTimestamp(); // backup的seekTimestamp不能超过master的seekTimestamp if (MasterElection.isBackup()) { if (seekTimestamp >= zkSeekTimestamp) { LOGGER.debug("backup's pull from db should stop for seekTimestamp > zkSeekTimestamp, seekTimestamp:{}, zkSeekTimestamp:{}, Thread:{}", seekTimestamp, zkSeekTimestamp, Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { } return; } }// seekTimestamp不能超过当前时间 final long now = TsUtils.genTS(); if (seekTimestamp > now) { LOGGER.debug("pull from db should stop for seekTimestamp > now, seekTimestamp:{}, now:{}, Thread:{}", seekTimestamp, now, round, Thread.currentThread().getName()); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { } return; }round++; final long start = System.currentTimeMillis(); final long diff = start / 1000 - seekTimestamp; LOGGER.info("pull from db start, seekTimestamp:{}, currTimestamp:{}, diff:{} round:{}", seekTimestamp, start / 1000, diff, round); MetricService.putSeekLatency(MasterElection.getState().toString(), diff + 10); // 因为0上传到metric之后不显示// 迭代出当前 seekTimestamp 下所有数据 int count = 0; try (RocksIterator it = RDB.newIterator(CFManager.CFH_DEFAULT)) { for (it.seek(KeyUtils.genSeekKey(seekTimestamp)); it.isValid(); it.next()) { final String dMsgId = new String(it.key()); final InternalKey internalKey = new InternalKey(dMsgId); //......boolean needMetricWriteQpsAfterSplit = false; // 循环消息需要插入一条新的消息, 如果失效, 则不再插入 if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue() || internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) { final InternalKey nextInternalKey = new InternalKey(internalKey).nextUniqDelayMsgId(); if (!KeyUtils.isInvalidMsg(nextInternalKey)) { batcher.putToDefaultCF(nextInternalKey.genUniqDelayMsgIdWithSegmentInfoIfHas(), it.value(), null, nextInternalKey, Actions.ADD.getValue()); needMetricWriteQpsAfterSplit = true; } }byte[] bytes = it.value(); if (internalKey.getSegmentNum() > 0) { try { OUTPUT.write(it.value()); LOGGER.info("segment merge, dMsgId:{}, value.len:{}, value.acc.len:{}", internalKey.genUniqDelayMsgIdWithSegmentInfoIfHas(), it.value().length, OUTPUT.size()); if (internalKey.getSegmentNum() != (internalKey.getSegmentIndex() - Constants.SEGMENT_INDEX_BASE + 1)) { continue; } bytes = OUTPUT.toByteArray(); OUTPUT.reset(); } catch (IOException e) { LOGGER.error("error while output.write byte array, msg:{}", e.getMessage(), e); } }// 如果解析不出来, 说明格式有问题, 抛弃掉该条消息, 不阻塞 final InternalValue internalValue = https://www.it610.com/article/JsonUtils.fromJsonString(bytes, InternalValue.class); if (internalValue == null) { continue; }//......count++; try { blockingQueue.put(new InternalPair(internalKey, internalValue)); } catch (InterruptedException e) { LOGGER.error("error while put to blockingQueue, dMsgId:{}", dMsgId); }if (count % INTERNAL_PAIR_COUNT == 0) { sendConcurrent(blockingQueue, round); } }sendConcurrent(blockingQueue, round); }needCancelMap.forEach((uniqDelayMsgId, tombstoneKey) -> { final InternalKey internalKey = new InternalKey(uniqDelayMsgId); final InternalKey tombstoneInternalKey = new InternalKey(tombstoneKey); // 残留的循环消息取消需要重新添加进去, 否则会删除不掉 if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue() || internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) { final InternalKey nextTombstoneKey = tombstoneInternalKey.nextUniqDelayMsgId(); final InternalKey nextInternalKey = internalKey.nextUniqDelayMsgId(); if (!KeyUtils.isInvalidMsg(nextTombstoneKey)) { String topic = needCancelTopicMap.get(uniqDelayMsgId); batcher.putToDefaultCF(nextTombstoneKey.genUniqDelayMsgId(), new CancelWrap(nextInternalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, nextInternalKey, Actions.CANCEL.getValue()); } else { LOGGER.info("pull from db succ cancel message of tombstone key, tombstone dMsgId:{}", nextTombstoneKey.genUniqDelayMsgId()); } } }); batcher.flush(); needCancelMap.clear(); needCancelTopicMap.clear(); // 更新offset MetaService.nextSeekTimestamp(); LOGGER.info("pull from db finish push, pushCost:{}ms, count:{}, seekTimestamp:{}, round:{}", System.currentTimeMillis() - start, count, seekTimestamp, round); }//...... }

  • pullFromDefaultCFAndPush方法先从metaService获取seekTimestamp及zkSeekTimestamp,若seekTimestamp超过当前时间则提前返回;之后从RDB.newIterator(CFManager.CFH_DEFAULT)获取RocksIterator进行遍历,读取dMsgId构造internalKey,若其type是LOOP_DELAY或者LOOP_EXPONENT_DELAY则通过batcher.putToDefaultCF重新放入rocksdb;之后读取it.value()构造internalValue,紧接着构造InternalPair放入到blockingQueue,之后在count % INTERNAL_PAIR_COUNT == 0时执行sendConcurrent,在循环结束之后再执行一次sendConcurrent;最后更新MetaService.nextSeekTimestamp()
sendConcurrent DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPushService.java
public class MqPushService {//......private void sendConcurrent(final BlockingQueue blockingQueue, final long round) { if (blockingQueue.size() == 0) { LOGGER.info("pull from db sendConcurrent start, return for no message to send, round:{}", round); return; }final long sendCount = blockingQueue.size(); LOGGER.info("pull from db sendConcurrent start, send count:{}, round:{}", sendCount, round); final long start = System.currentTimeMillis(); final CountDownLatch cdl = new CountDownLatch(blockingQueue.size()); InternalPair internalPair; while ((internalPair = blockingQueue.poll()) != null) { final InternalPair immutableInternalPair = internalPair; pushThreadPool.execute(() -> { while (!send( immutableInternalPair.getInternalValue().getTopic(), immutableInternalPair.getInternalValue().getBody().getBytes(Charsets.UTF_8), immutableInternalPair.getInternalKey(), immutableInternalPair.getInternalValue().getTags(), immutableInternalPair.getInternalValue().getProperties(), false)) { try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { } } cdl.countDown(); }); }try { cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } final long cost = System.currentTimeMillis() - start; LOGGER.info("pull from db sendConcurrent end, send count:{}, round:{}, cost:{}ms", sendCount, round, cost); }//...... }

  • sendConcurrent方法会执行blockingQueue.poll(),然后执行send方法
send 【聊聊chronos的pullFromDefaultCFAndPush】DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPushService.java
public class MqPushService {//......private boolean send(final String topic, final byte[] body, final InternalKey internalKey, final String tags, final Map properties, final boolean direct) { final long start = System.nanoTime(); final String key = internalKey.genUniqDelayMsgId(); MetricMsgType metricMsgType; if (internalKey.getType() == MsgTypes.DELAY.getValue()) { metricMsgType = MetricMsgType.DELAY; } else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) { metricMsgType = MetricMsgType.LOOP_DELAY; } else { metricMsgType = MetricMsgType.UNKNOWN; }int len = 0; if (body != null) { len = body.length; }if (MasterElection.isBackup()) { if (direct) { LOGGER.info("succ send message(but cancel for backup) directly, topic:{}, dMsgId:{}, len:{}", topic, key, len); MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.SEND, MetricPushMsgResult.BACKUP); } else { LOGGER.info("succ send message(but cancel for backup) from db, topic:{}, dMsgId:{}, len:{}", topic, key, len); MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.DB, MetricPushMsgResult.BACKUP); } return true; }if (ConfigManager.getConfig().isFakeSend()) { try { TimeUnit.MILLISECONDS.sleep(1); } catch (InterruptedException e) { } if (direct) { LOGGER.info("succ send message directly(fakeSend), topic:{}, dMsgId:{}, len:{}", topic, key, len); } else { LOGGER.info("succ send message from db(fakeSend), topic:{}, dMsgId:{}, len:{}", topic, key, len); } return true; }MessageBuilder messageBuilder = producer.messageBuilder().setTopic(topic).setBody(body).setKey(key).setTags(tags).setRandomPartition(); if (properties != null && properties.size() > 0) { for (Map.Entry entry : properties.entrySet()) { LOGGER.debug("properties, topic:{}, dMsgId:{}, key:{}, value:{}", topic, key, entry.getKey(), entry.getValue()); // IMPORTANT: If use addProperty for isPressureTraffic, the property will be ignored if (PRESSURE_TRAFFIC_KEY.equals(entry.getKey())) { messageBuilder.setPressureTraffic(Boolean.parseBoolean(entry.getValue())); } else { messageBuilder.addProperty(entry.getKey(), entry.getValue()); } } } messageBuilder.addProperty(PROPERTY_KEY_FROM_CHRONOS, PROPERTY_KEY_FROM_CHRONOS); final Result result = messageBuilder.send(); final long cost = (System.nanoTime() - start) / 1000; MetricService.putPushLatency(topic, cost); if (result.getCode() == CarreraReturnCode.OK) { if (direct) { LOGGER.info("succ send message directly, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost); MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.SEND, MetricPushMsgResult.OK); } else { LOGGER.info("succ send message from db, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost); MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.DB, MetricPushMsgResult.OK); } return true; } else if (result.getCode() == CarreraReturnCode.FAIL_TOPIC_NOT_EXIST || result.getCode() == CarreraReturnCode.FAIL_TOPIC_NOT_ALLOWED || result.getCode() == CarreraReturnCode.FAIL_ILLEGAL_MSG || result.getCode() == CarreraReturnCode.MISSING_PARAMETERS) { if (direct) { LOGGER.error("fail send message directly, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost); MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.SEND, MetricPushMsgResult.FAIL); } else { LOGGER.error("fail send message from db, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost); MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.DB, MetricPushMsgResult.FAIL); } return true; } else { if (direct) { LOGGER.error("error while send message directly, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost); MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.SEND, MetricPushMsgResult.FAIL); } else { LOGGER.error("error while send message from db, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost); MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.DB, MetricPushMsgResult.FAIL); } return false; } }//...... }

  • send方法主要是构造messageBuilder,然后执行messageBuilder.send()
小结 pullFromDefaultCFAndPush方法先从metaService获取seekTimestamp及zkSeekTimestamp,若seekTimestamp超过当前时间则提前返回;之后从RDB.newIterator(CFManager.CFH_DEFAULT)获取RocksIterator进行遍历,读取dMsgId构造internalKey,若其type是LOOP_DELAY或者LOOP_EXPONENT_DELAY则通过batcher.putToDefaultCF重新放入rocksdb;之后读取it.value()构造internalValue,紧接着构造InternalPair放入到blockingQueue,之后在count % INTERNAL_PAIR_COUNT == 0时执行sendConcurrent,在循环结束之后再执行一次sendConcurrent;最后更新MetaService.nextSeekTimestamp()
doc
  • carrera-chronos

    推荐阅读