1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
func (cluster *mongoCluster) syncServersIteration(direct bool) { log("SYNC Starting full topology synchronization...")
var wg sync.WaitGroup var m sync.Mutex notYetAdded := make(map[string]pendingAdd) addIfFound := make(map[string]bool) seen := make(map[string]bool) syncKind := partialSync
var spawnSync func(addr string, byMaster bool) spawnSync = func(addr string, byMaster bool) { // ✨ 标记开启了一个新线程 wg.Add(1) go func() { defer wg.Done()
// ✨ 解析地址,这个resolveAddr实现好🐂🍺,考虑的较全面 // ✨ (不进去看代码了)先解析ip地址,不是ip地址就当初udp连接解析,因为拨号连接可能耗时,用了select管道并发处理,这里分别以udp4、udp6进行尝试 tcpaddr, err := resolveAddr(addr) if err != nil { log("SYNC Failed to start sync of ", addr, ": ", err.Error()) return } resolvedAddr := tcpaddr.String()
// ✨ 已发现地址的逻辑,要加锁 m.Lock() if byMaster { if pending, ok := notYetAdded[resolvedAddr]; ok { delete(notYetAdded, resolvedAddr) m.Unlock() cluster.addServer(pending.server, pending.info, completeSync) return } addIfFound[resolvedAddr] = true } // ✨ 如果已经发现该地址,stop if seen[resolvedAddr] { m.Unlock() return } // ✨ 标记当前地址已被发现 seen[resolvedAddr] = true m.Unlock()
server := cluster.server(addr, tcpaddr) // ✨ 同步server信息 info, hosts, err := cluster.syncServer(server) if err != nil { // ✨ 同步信息出错,移除 cluster.removeServer(server) return }
m.Lock() add := direct || info.Master || addIfFound[resolvedAddr] if add { syncKind = completeSync } else { notYetAdded[resolvedAddr] = pendingAdd{server, info} } m.Unlock() if add { cluster.addServer(server, info, completeSync) } if !direct { for _, addr := range hosts { spawnSync(addr, info.Master) } } }() }
knownAddrs := cluster.getKnownAddrs() for _, addr := range knownAddrs { spawnSync(addr, false) } wg.Wait()
if syncKind == completeSync { logf("SYNC Synchronization was complete (got data from primary).") for _, pending := range notYetAdded { cluster.removeServer(pending.server) } } else { logf("SYNC Synchronization was partial (cannot talk to primary).") for _, pending := range notYetAdded { cluster.addServer(pending.server, pending.info, partialSync) } }
cluster.Lock() mastersLen := cluster.masters.Len() logf("SYNC Synchronization completed: %d master(s) and %d slave(s) alive.", mastersLen, cluster.servers.Len()-mastersLen)
// Update dynamic seeds, but only if we have any good servers. Otherwise, // leave them alone for better chances of a successful sync in the future. if syncKind == completeSync { dynaSeeds := make([]string, cluster.servers.Len()) for i, server := range cluster.servers.Slice() { dynaSeeds[i] = server.Addr } cluster.dynaSeeds = dynaSeeds debugf("SYNC New dynamic seeds: %#v\n", dynaSeeds) } cluster.Unlock() }
|