Spectrum|Spectrum 区块偶尔停止同步问题排查与解决笔记
同步失败问的题追踪
代码地址: https://github.com/SmartMeshFoundation/Spectrum
本次修正将提交在 dev 分支中,并会随 0.5.2 版本一起发布
- 问题描述:
当节点为创世节点时,没有进入轮换阵营时节点会处于等待提名状态,永远也不会被成功提名,因为创世节点在共识合约中有拒绝提名的判断。
此时同步到一定数量的新块之后,块会停留在那个高度不再增长,本次观测停留在 1222972 块不再增长了。
- 线索日志:
DEBUG[11-21|19:21:46|core/blockchain.go:1009]Inserted new blocknumber=1222972 hash=7af7f5?~@?985c9euncles=0txs=1gas=0elapsed=11.820ms
DEBUG[11-21|19:21:52|eth/downloader/downloader.go:1562]Recalculated downloader QoS valuesrtt=11.836220508s confidence=1.000 ttl=35.508697032s
DEBUG[11-21|19:22:00|eth/handler.go:642]<>trueTD=3077093 p.td=3077078 p.id=f6cb0c9800fb01fb11cb313848091596ba4fd167e1c7873e0520ebdd59ceb454cf5a16d7f78ff7aaa91f117ad6694bca4de63d3150cb1b48813d75d4b98e2deb
DEBUG[11-21|19:22:00|eth/fetcher/fetcher.go:607]Discarded propagated block, exceeded allowance peer=f6cb0c9800fb01fbnumber=1222973 hash=33b369?~@?26e683limit=64
DEBUG[11-21|19:22:00|eth/peer.go:193]Fetching single headerid=4b36359d6b54ab46 conn=dyndial hash=33b369?~@?26e683
DEBUG[11-21|19:22:00|eth/peer.go:214]Fetching batch of block bodiesid=4b36359d6b54ab46 conn=dyndial count=1
DEBUG[11-21|19:22:00|eth/fetcher/fetcher.go:607]Discarded propagated block, exceeded allowance peer=4b36359d6b54ab46number=1222973 hash=33b369?~@?26e683limit=64
DEBUG[11-21|19:22:03|eth/downloader/downloader.go:1562]Recalculated downloader QoS valuesrtt=11.836220508s confidence=1.000 ttl=35.508697032s
DEBUG[11-21|19:22:14|eth/handler.go:642]<>trueTD=3077096 p.td=3077081 p.id=6549749a9e83b4bd89e1469d51986cc1689094b6621daa651d3e76dc9720659008cad99e949d274b6c26e87241964775e22a01e167b79b85dd454fd160b46fac
DEBUG[11-21|19:22:14|eth/fetcher/fetcher.go:631]Queued propagated blockpeer=6549749a9e83b4bdnumber=1222974 hash=670fb2?~@?f0af05queued=1
同步块的逻辑: 从日志上看一直有 "NewBlockMsg" 日志输出,说明问题不在网络层。
这部分与块的下载也无关,所有逻辑貌似都正常
- 入口方法:
func (pm *ProtocolManager) synchronise(peer *peer)
被三个点触发eth/handler.go 的 func (pm *ProtocolManager) handleMsg(p *peer) error 处理全部消息,其中包括 NewBlockMsg
1、newblock 消息
2、新 peer 连接 :
3、定时器
case msg.Code == NewBlockMsg:
// Retrieve and decode the propagated block
var request newBlockData
if err := msg.Decode(&request);
err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
request.Block.ReceivedAt = msg.ReceivedAt
request.Block.ReceivedFrom = p// Mark the peer as owning the block and schedule it for import
p.MarkBlock(request.Block.Hash())
pm.fetcher.Enqueue(p.id, request.Block)// Assuming the block is importable by the peer, but possibly not yet done so,
// calculate the head hash and TD that the peer truly must have.
var (
trueHead = request.Block.ParentHash()
trueTD= new(big.Int).Sub(request.TD, request.Block.Difficulty())
)
_, tttt := p.Head()currentBlock := pm.blockchain.CurrentBlock()
peerpub, _ := p.ID().Pubkey()
peeraddr := crypto.PubkeyToAddress(*peerpub)
log.Debug("<>",
"currentBlock", currentBlock,
"recvBlock", request.Block.Number(),
"currentTD", pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()),
"trueTD", trueTD,
"p.td", tttt,
"p.id", peeraddr.Hex(),
)
// Update the peers total difficulty if better than the previous
if _, td := p.Head();
trueTD.Cmp(td) > 0 {
p.SetHead(trueHead, trueTD)// Schedule a sync if above ours. Note, this will not fire a sync for a gap of
// a singe block (as the true TD is below the propagated block), however this
// scenario should easily be covered by the fetcher.
//currentBlock := pm.blockchain.CurrentBlock()
if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
go pm.synchronise(p)
}
}
pm.fetcher.Enqueue(p.id, request.Block) 到底在干什么
// Enqueue tries to fill gaps the the fetcher's future import queue.
func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
op := &inject{
origin: peer,
block:block,
}
select {
case f.inject <- op:
return nil
case <-f.quit:
return errTerminated
}
}
他是把参数放进 fetcher.inject 中,然后被 loop() 处理,如下片段
case op := <-f.inject:
// A direct block insertion was requested, try and fill any pending gaps
propBroadcastInMeter.Mark(1)
f.enqueue(op.origin, op.block)
f.enqueue 里面有很重要的规则
func (f *Fetcher) enqueue(peer string, block *types.Block) {
hash := block.Hash()// Ensure the peer isn't DOSing us
count := f.queues[peer] + 1
// 如果当前 peer 已经有超过 64 个块在排队等待处理,则忽略当前块blockLimit = 64
if count > blockLimit {
log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
propBroadcastDOSMeter.Mark(1)
f.forgetHash(hash)
return
}
// Discard any past or too distant blocks
// 如果收到的块是 7 块之前的叔块或者 32块以后的块,要忽略掉 maxUncleDist = 7 , maxQueueDist = 32
if dist := int64(block.NumberU64()) - int64(f.chainHeight());
dist < -maxUncleDist || dist > maxQueueDist {
log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
propBroadcastDropMeter.Mark(1)
f.forgetHash(hash)
return
}
// Schedule the block for future importing
if _, ok := f.queued[hash];
!ok {
op := &inject{
origin: peer,
block:block,
}
f.queues[peer] = count
f.queued[hash] = op
f.queue.Push(op, -float32(block.NumberU64()))
if f.queueChangeHook != nil {
f.queueChangeHook(op.block.Hash(), true)
}
log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
}
}
主要是放到 f.queue 中,然后在 loop() 中对 f.queue 进行循环处理,我感觉问题就出现在这个函数中 func (f *Fetcher) loop() ,开头部分的 queue 处理逻辑可能有问题
问题出在 if count > blockLimit 这个条件成立时,这个条件为什么会成立?
// enqueue schedules a new future import operation, if the block to be imported
// has not yet been seen.
func (f *Fetcher) enqueue(peer string, block *types.Block) {
hash := block.Hash()// Ensure the peer isn't DOSing us
count := f.queues[peer] + 1
if count > blockLimit {
log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
propBroadcastDOSMeter.Mark(1)
f.forgetHash(hash)
return
}
// Discard any past or too distant blocks
if dist := int64(block.NumberU64()) - int64(f.chainHeight());
dist < -maxUncleDist || dist > maxQueueDist {
log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
propBroadcastDropMeter.Mark(1)
f.forgetHash(hash)
return
}
......
输出了更多日志进行分析,发现是 insertChain 没有正确返回,里面有阻塞
// insert spawns a new goroutine to run a block insertion into the chain. If the
// block's number is at the same height as the current import phase, if updates
// the phase states accordingly.
func (f *Fetcher) insert(peer string, block *types.Block) {
......
log.Debug("insert_begin","number",block.Number())
// 这里没有返回
if _, err := f.insertChain(types.Blocks{block});
err != nil {
log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
return
}
log.Debug("insert_end","number",block.Number())
......
}
观察只有 begin 没有 end 和 done 时,去 grep Inserted 关键字
DEBUG[11-26|17:35:06|eth/fetcher/fetcher.go:672]insert_beginnumber=1253372
DEBUG[11-26|17:35:06|eth/fetcher/fetcher.go:674]insert_endnumber=1253372 err=nil
DEBUG[11-26|17:35:06|eth/fetcher/fetcher.go:645]insert_donenumber=1253372
最终确认,是阻塞在事件广播上
//core/blockchain.go
func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
n, events, logs, err := bc.insertChain(chain)
// 这里阻塞了
bc.PostChainEvents(events, logs)
log.Debug("Inserted block end", "number", chain[0].Number())
return n, err
}//
// PostChainEvents iterates over the events generated by a chain insertion and
// posts them into the event feed.
// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
// post event logs for further processing
if logs != nil {
bc.logsFeed.Send(logs)
}
for _, event := range events {
switch ev := event.(type) {
case ChainEvent:
bc.chainFeed.Send(ev)case ChainHeadEvent:
//上面的阻塞是因为这里的阻塞造成的
bc.chainHeadFeed.Send(ev)case ChainSideEvent:
bc.chainSideFeed.Send(ev)
}
}
}
【Spectrum|Spectrum 区块偶尔停止同步问题排查与解决笔记】可以看到这个阻塞是发布 ChainHead 事件时造成的,首先要找到哪些地方在调用这个 subscribe,可能有锁的竞争,也许是一个没有完成的调用导致的。但是它是运行一段事件后才开始阻塞的,这就有些奇怪了。
通过对 chainHeadFeed 的订阅者排查,最终定位问题,在 worker 的 update 方法中,接受 chainHeadEvent 时要执行 commitNewWork() ,第一次执行成功,第二次失败并阻塞,这个订阅接受消息的 channel 有 10 个 slot ,所以 10 个事件以后彻底阻塞事件广播模块,并在某个 peer 阻塞 send event 的 64 个块之后,将 peer 判定为 dosing 节点,然后就出现上文提到的现象,会有概率丢失一个块,而造成不同步。
- 通过如下逻辑调整可以解决上述问题:
在 worker.commitNewWork() 中,经过调试,发现是阻塞在 self.push(work) 上
// push sends a new work task to currently live miner agents.
func (self *worker) push(work *Work) {
// 在 miner 没有正确启动前应该走这个分支,但是因为之前对 miner.start 做了异步处理
// 误将这个标识错误的设置为启动成功,此时没有启动 agents,最终导致下面for逻辑的阻塞
if atomic.LoadInt32(&self.mining) != 1 {
return
}
for agent := range self.agents {
atomic.AddInt32(&self.atWork, 1)
if ch := agent.Work();
ch != nil {
ch <- work
}
}
}// 解决办法也比较简单,只是排查起来比较复杂,将 self.mining 挪到下面去设置即可
func (self *worker) start(s chan int) {
self.mu.Lock()
defer self.mu.Unlock()
// 挪到下面去
//atomic.StoreInt32(&self.mining, 1)//add by liangc : sync mining status
wg := new(sync.WaitGroup)
if tribe, ok := self.engine.(*tribe.Tribe);
ok {
wg.Add(1)
go func() {
defer wg.Done()
if self.chain.CurrentHeader().Number.Int64() > 1 { // free for genesis signer
log.Info("?? Everything is ready, Waiting for nomination, pending until miner level upgrade")
// pending until miner level upgrade
tribe.WaitingNomination()
}
tribe.SetMining(1, self.chain.CurrentBlock().Number(), self.chain.CurrentHeader().Hash())
}()
}go func() {
defer func() { s <- 1 }()
wg.Wait()
// 从上面挪下来的 >>>>
atomic.StoreInt32(&self.mining, 1)
// 从上面挪下来的 <<<<
// spin up agents
for agent := range self.agents {
agent.Start()
}
}()
}
推荐阅读
- 「#1-颜龙武」区块链的价值是什么()
- 【#2-戴栋】区块链可以提升哪些商业上的效率改进
- 遇见·三毛
- cocosbcx白皮书解读(一)|cocosbcx白皮书解读(一)| 区块链如何统一开发者、发行商、用户之间的利益
- 区块链开发平台(以太坊)
- 如何理性看待区块链热潮
- 用Go构建区块链——3.持久化和命令行
- 活着没病,就挺好!
- 神话的破灭
- 偶尔松懈也不怕,不停止才是关键