
笔者关注ZooKeeper有一段时间,从ZooKeeper提供的API中,发现有一个比较有意思的API,叫sync。但一直不太明白sync API的含义。

Sometimes developers mistakenly assume one other guarantee that ZooKeeper does not in fact make. This is:
Simultaneously Consistent Cross-Client Views
ZooKeeper does not guarantee that at every instance in time, two different clients will have identical views of ZooKeeper data. Due to factors like network delays, one client may perform an update before another client gets notified of the change. Consider the scenario of two clients, A and B. If client A sets the value of a znode /a from 0 to 1, then tells client B to read /a, client B may read the old value of 0, depending on which server it is connected to. If it is important that Client A and Client B read the same value, Client B should should call the sync() method from the ZooKeeper API method before it performs its read.
So, ZooKeeper by itself doesn't guarantee that changes occur synchronously across all servers, but ZooKeeper primitives can be used to construct higher level functions that provide useful client synchronization.
@Override public void run() { try { while (!finished) { Request request = queuedRequests.take(); if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK, 'F', request, ""); } if (request == Request.requestOfDeath) { break; } // We want to queue the request to be processed before we submit // the request to the leader so that we are ready to receive // the response nextProcessor.processRequest(request); // We now ship the request to the leader. As with all // other quorum operations, sync also follows this code // path, but different from others, we need to keep track // of the sync operations this follower has pending, so we // add it to pendingSyncs. switch (request.type) { case OpCode.sync: // 可以看到sync请求和其它事务型请求的的区别在于,除了发送给leader之外,还要记录到pendingSyncs里 zks.pendingSyncs.add(request); zks.getFollower().request(request); break; case OpCode.create: case OpCode.delete: case OpCode.setData: case OpCode.setACL: case OpCode.createSession: case OpCode.closeSession: case OpCode.multi: zks.getFollower().request(request); break; } } } catch (Exception e) { handleException(this.getName(), e); } LOG.info("FollowerRequestProcessor exited loop!"); }

// 接收到leader发送的Leader.SYNC消息后,才真正commit这个请求 synchronized public void sync(){ if(pendingSyncs.size() ==0){ LOG.warn("Not expecting a sync."); return; }Request r = pendingSyncs.remove(); commitProcessor.commit(r); }

synchronized public void processSync(LearnerSyncRequest r){ if(outstandingProposals.isEmpty()){ sendSync(r); } else { List l = pendingSyncs.get(lastProposed); if (l == null) { l = new ArrayList(); } l.add(r); pendingSyncs.put(lastProposed, l); } }

