(*本文为《深入理解以太坊 P2P 网络设计》的下篇,为了更流畅阅读,请移步主页阅读上篇。) 终止服务 Stop函数用于终止节点运行,具体代码如下所示: func?(srv?*Server)?Stop()?{ ???srv.lock.Lock() ???if?!srv.running?{ ???????srv.lock.Unlock() ???????return ??} ???srv.running?=?false ???if?srv.listener?!=?nil?{ ???????//?this?unblocks?listener?Accept ???????srv.listener.Close() ??} ???close(srv.quit) ???srv.lock.Unlock() ???srv.loopWG.Wait() } 服务启动 位于 go-ethereum-1.10.2\p2p\server.go中的start 函数用于启动一个P2P节点: //?filedir:go-ethereum-1.10.2\p2p\server.go?L433 func?(srv?*Server)?Start()?(err?error)?{ ???srv.lock.Lock() ???defer?srv.lock.Unlock() ???if?srv.running?{ ???????return?errors.New("server?already?running") ??} ???srv.running?=?true ???srv.log?=?srv.Config.Logger ???if?srv.log?==?nil?{ ???????srv.log?=?log.Root() ??} ???if?srv.clock?==?nil?{ ???????srv.clock?=?mclock.System{} ??} ???if?srv.NoDial?&&?srv.ListenAddr?==?""?{ ???????srv.log.Warn("P2P?server?will?be?useless,?neither?dialing?nor?listening") ??} ???//?static?fields ???if?srv.PrivateKey?==?nil?{ ???????return?errors.New("Server.PrivateKey?must?be?set?to?a?non-nil?key") ??} ???if?srv.newTransport?==?nil?{ ???????srv.newTransport?=?newRLPX ??} ???if?srv.listenFunc?==?nil?{ ???????srv.listenFunc?=?net.Listen ??} ???srv.quit?=?make(chan?struct{}) ???srv.delpeer?=?make(chan?peerDrop) ???srv.checkpointPostHandshake?=?make(chan?*conn) ???srv.checkpointAddPeer?=?make(chan?*conn) ???srv.addtrusted?=?make(chan?*enode.Node) ???srv.removetrusted?=?make(chan?*enode.Node) ???srv.peerOp?=?make(chan?peerOpFunc) ???srv.peerOpDone?=?make(chan?struct{}) ???if?err?:=?srv.setupLocalNode();?err?!=?nil?{ ???????return?err ??} ???if?srv.ListenAddr?!=?""?{ ???????if?err?:=?srv.setupListening();?err?!=?nil?{ ???????????return?err ??????} ??} ???if?err?:=?srv.setupDiscovery();?err?!=?nil?{ ???????return?err ??} ???srv.setupDialScheduler() ???srv.loopWG.Add(1) ???go?srv.run() ???return?nil } 在这里首先检查当前节点是否处于运行状态,如果是则直接返回并给出错误提示信息,如果不是则将srv.running设置为true,之后进入服务启动流程,之后检查log是否开启等,之后初始化配置P2P服务信息: //?Start?starts?running?the?server. //?Servers?can?not?be?re-used?after?stopping. func?(srv?*Server)?Start()?(err?error)?{ ???srv.lock.Lock() ???defer?srv.lock.Unlock() ???if?srv.running?{ ???????return?errors.New("server?already?running") ??} ???srv.running?=?true ???srv.log?=?srv.Config.Logger ???if?srv.log?==?nil?{ ???????srv.log?=?log.Root() ??} ???if?srv.clock?==?nil?{ ???????srv.clock?=?mclock.System{} ??} ???if?srv.NoDial?&&?srv.ListenAddr?==?""?{ ???????srv.log.Warn("P2P?server?will?be?useless,?neither?dialing?nor?listening") ??} ???//?static?fields ???if?srv.PrivateKey?==?nil?{ ???????return?errors.New("Server.PrivateKey?must?be?set?to?a?non-nil?key") ??} ???if?srv.newTransport?==?nil?{ ???????srv.newTransport?=?newRLPX ??} ???if?srv.listenFunc?==?nil?{ ???????srv.listenFunc?=?net.Listen ??} ???srv.quit?=?make(chan?struct{}) ???srv.delpeer?=?make(chan?peerDrop) ???srv.checkpointPostHandshake?=?make(chan?*conn) ???srv.checkpointAddPeer?=?make(chan?*conn) ???srv.addtrusted?=?make(chan?*enode.Node) ???srv.removetrusted?=?make(chan?*enode.Node) ???srv.peerOp?=?make(chan?peerOpFunc) ???srv.peerOpDone?=?make(chan?struct{}) 之后调用setupLocalNode来启动一个本地节点,并建立本地监听,然后配置一个DiscoveryV5网络协议,生成节点路由表。 调用setupDialScheduler启动主动拨号连接过程,然后开一个协程,在其中做peer的维护: srv.setupDialScheduler() ???srv.loopWG.Add(1) ???go?srv.run() ???return?nil } setupDialScheduler代码如下所示,这里通过newDialScheduler来建立连接,参数discmix确定了进行主动建立连接时的节点集,它是一个迭代器 ,同时将setupConn连接建立函数传入: func?(srv?*Server)?setupDialScheduler()?{ ???config?:=?dialConfig{ ???????self:???????????srv.localnode.ID(), ???????maxDialPeers:???srv.maxDialedConns(), ???????maxActiveDials:?srv.MaxPendingPeers, ???????log:????????????srv.Logger, ???????netRestrict:????srv.NetRestrict, ???????dialer:?????????srv.Dialer, ???????clock:??????????srv.clock, ??} ???if?srv.ntab?!=?nil?{ ???????config.resolver?=?srv.ntab ??} ???if?config.dialer?==?nil?{ ???????config.dialer?=?tcpDialer{&net.Dialer{Timeout:?defaultDialTimeout}} ??} ???srv.dialsched?=?newDialScheduler(config,?srv.discmix,?srv.SetupConn) ???for?_,?n?:=?range?srv.StaticNodes?{ ???????srv.dialsched.addStatic(n) ??} } newDialScheduler函数如下所示,在这里通过d.readNodes(it)从迭代器中取得节点,之后通过通道传入d.loop(it)中进行连接: //?filedir:go-ethereum-1.10.2\p2p\dial.go???L162 func?newDialScheduler(config?dialConfig,?it?enode.Iterator,?setupFunc?dialSetupFunc)?*dialScheduler?{ ???d?:=?&dialScheduler{ ???????dialConfig:??config.withDefaults(), ???????setupFunc:???setupFunc, ???????dialing:?????make(map[enode.ID]*dialTask), ???????static:??????make(map[enode.ID]*dialTask), ???????peers:???????make(map[enode.ID]connFlag), ???????doneCh:??????make(chan?*dialTask), ???????nodesIn:?????make(chan?*enode.Node), ???????addStaticCh:?make(chan?*enode.Node), ???????remStaticCh:?make(chan?*enode.Node), ???????addPeerCh:???make(chan?*conn), ???????remPeerCh:???make(chan?*conn), ??} ???d.lastStatsLog?=?d.clock.Now() ???d.ctx,?d.cancel?=?context.WithCancel(context.Background()) ???d.wg.Add(2) ???go?d.readNodes(it) ???go?d.loop(it) ???return?d } 服务监听 在上面的服务启动过程中有一个setupListening函数,该函数用于监听事件,具体代码如下所示: func?(srv?*Server)?setupListening()?error?{ ???//?Launch?the?listener. ???listener,?err?:=?srv.listenFunc("tcp",?srv.ListenAddr) ???if?err?!=?nil?{ ???????return?err ??} ???srv.listener?=?listener ???srv.ListenAddr?=?listener.Addr().String() ???//?Update?the?local?node?record?and?map?the?TCP?listening?port?if?NAT?is?configured. ???if?tcp,?ok?:=?listener.Addr().(*net.TCPAddr);?ok?{ ???????srv.localnode.Set(enr.TCP(tcp.Port)) ???????if?!tcp.IP.IsLoopback()?&&?srv.NAT?!=?nil?{ ???????????srv.loopWG.Add(1) ???????????go?func()?{ ???????????????nat.Map(srv.NAT,?srv.quit,?"tcp",?tcp.Port,?tcp.Port,?"ethereum?p2p") ???????????????srv.loopWG.Done() ??????????}() ??????} ??} ???srv.loopWG.Add(1) ???go?srv.listenLoop() ???return?nil } 在上述代码中又调用了一个 srv.listenLoop(),该函数是一个死循环的 goroutine,它会监听端口并接收外部的请求: //?listenLoop?runs?in?its?own?goroutine?and?accepts //?inbound?connections. func?(srv?*Server)?listenLoop()?{ ???srv.log.Debug("TCP?listener?up",?"addr",?srv.listener.Addr()) ???//?The?slots?channel?limits?accepts?of?new?connections. ???tokens?:=?defaultMaxPendingPeers ???if?srv.MaxPendingPeers?>?0?{ ???????tokens?=?srv.MaxPendingPeers ??} ???slots?:=?make(chan?struct{},?tokens) ???for?i?:=?0;?i?<?tokens;?i++?{ ???????slots?<-?struct{}{} ??} ???//?Wait?for?slots?to?be?returned?on?exit.?This?ensures?all?connection?goroutines ???//?are?down?before?listenLoop?returns. ???defer?srv.loopWG.Done() ???defer?func()?{ ???????for?i?:=?0;?i?<?cap(slots);?i++?{ ???????????<-slots ??????} ??}() ???for?{ ???????//?Wait?for?a?free?slot?before?accepting. ???????<-slots ???????var?( ???????????fd??????net.Conn ???????????err?????error ???????????lastLog?time.Time ??????) ???????for?{ ???????????fd,?err?=?srv.listener.Accept() ???????????if?netutil.IsTemporaryError(err)?{ ???????????????if?time.Since(lastLog)?>?1*time.Second?{ ???????????????????srv.log.Debug("Temporary?read?error",?"err",?err) ???????????????????lastLog?=?time.Now() ??????????????} ???????????????time.Sleep(time.Millisecond?*?200) ???????????????continue ??????????}?else?if?err?!=?nil?{ ???????????????srv.log.Debug("Read?error",?"err",?err) ???????????????slots?<-?struct{}{} ???????????????return ??????????} ???????????break ??????} ???????remoteIP?:=?netutil.AddrIP(fd.RemoteAddr()) ???????if?err?:=?srv.checkInboundConn(remoteIP);?err?!=?nil?{ ???????????srv.log.Debug("Rejected?inbound?connection",?"addr",?fd.RemoteAddr(),?"err",?err) ???????????fd.Close() ???????????slots?<-?struct{}{} ???????????continue ??????} ???????if?remoteIP?!=?nil?{ ???????????var?addr?*net.TCPAddr ???????????if?tcp,?ok?:=?fd.RemoteAddr().(*net.TCPAddr);?ok?{ ???????????????addr?=?tcp ??????????} ???????????fd?=?newMeteredConn(fd,?true,?addr) ???????????srv.log.Trace("Accepted?connection",?"addr",?fd.RemoteAddr()) ??????} ???????go?func()?{ ???????????srv.SetupConn(fd,?inboundConn,?nil) ???????????slots?<-?struct{}{} ??????}() ??} } 这里的 SetupConn 主要执行执行握手协议,并尝试把链接创建为一个 peer 对象: //?SetupConn?runs?the?handshakes?and?attempts?to?add?the?connection //?as?a?peer.?It?returns?when?the?connection?has?been?added?as?a?peer //?or?the?handshakes?have?failed. func?(srv?*Server)?SetupConn(fd?net.Conn,?flags?connFlag,?dialDest?*enode.Node)?error?{ ???c?:=?&conn{fd:?fd,?flags:?flags,?cont:?make(chan?error)} ???if?dialDest?==?nil?{ ???????c.transport?=?srv.newTransport(fd,?nil) ??}?else?{ ???????c.transport?=?srv.newTransport(fd,?dialDest.Pubkey()) ??} ???err?:=?srv.setupConn(c,?flags,?dialDest) ???if?err?!=?nil?{ ???????c.close(err) ??} ???return?err } 在上述代码中又去调用了 srv.setupConn(c, flags, dialDest) 函数,该函数用于执行握手协议: func?(srv?*Server)?setupConn(c?*conn,?flags?connFlag,?dialDest?*enode.Node)?error?{ ???//?Prevent?leftover?pending?conns?from?entering?the?handshake. ???srv.lock.Lock() ???running?:=?srv.running ???srv.lock.Unlock() ???if?!running?{ ???????return?errServerStopped ??} ???//?If?dialing,?figure?out?the?remote?public?key. ???var?dialPubkey?*ecdsa.PublicKey ???if?dialDest?!=?nil?{????//?dest=nil?被动连接,dest!=nil主动连接诶 ???????dialPubkey?=?new(ecdsa.PublicKey) ???????if?err?:=?dialDest.Load((*enode.Secp256k1)(dialPubkey));?err?!=?nil?{ ???????????err?=?errors.New("dial?destination?doesn't?have?a?secp256k1?public?key") ???????????srv.log.Trace("Setting?up?connection?failed",?"addr",?c.fd.RemoteAddr(),?"conn",?c.flags,?"err",?err) ???????????return?err ??????} ??} ???//?Run?the?RLPx?handshake. ???remotePubkey,?err?:=?c.doEncHandshake(srv.PrivateKey)????//?公钥交换,确定共享秘钥RLPx层面的握手一来一去 ???if?err?!=?nil?{ ???????srv.log.Trace("Failed?RLPx?handshake",?"addr",?c.fd.RemoteAddr(),?"conn",?c.flags,?"err",?err) ???????return?err ??} ???if?dialDest?!=?nil?{ ???????c.node?=?dialDest ??}?else?{ ???????c.node?=?nodeFromConn(remotePubkey,?c.fd) ??} ???clog?:=?srv.log.New("id",?c.node.ID(),?"addr",?c.fd.RemoteAddr(),?"conn",?c.flags) ???err?=?srv.checkpoint(c,?srv.checkpointPostHandshake) ???if?err?!=?nil?{ ???????clog.Trace("Rejected?peer",?"err",?err) ???????return?err ??} ???//?Run?the?capability?negotiation?handshake. ???phs,?err?:=?c.doProtoHandshake(srv.ourHandshake)??//?进行协议层面的握手,也即p2p握手,一来一去 ???if?err?!=?nil?{ ???????clog.Trace("Failed?p2p?handshake",?"err",?err) ???????return?err ??} ???if?id?:=?c.node.ID();?!bytes.Equal(crypto.Keccak256(phs.ID),?id[:])?{ ???????clog.Trace("Wrong?devp2p?handshake?identity",?"phsid",?hex.EncodeToString(phs.ID)) ???????return?DiscUnexpectedIdentity ??} ???c.caps,?c.name?=?phs.Caps,?phs.Name ???err?=?srv.checkpoint(c,?srv.checkpointAddPeer)??//?状态校验 ???if?err?!=?nil?{ ???????clog.Trace("Rejected?peer",?"err",?err) ???????return?err ??} ???return?nil } 秘钥握手通过 deEncHandshake 函数实现,在函数之中调用了 Handshake() 函数: //?filedir:go-ethereum-1.10.2\p2p\transport.go?L123 func?(t?*rlpxTransport)?doEncHandshake(prv?*ecdsa.PrivateKey)?(*ecdsa.PublicKey,?error)?{ ???t.conn.SetDeadline(time.Now().Add(handshakeTimeout)) ???return?t.conn.Handshake(prv) } Handshake 代码如下所示,在这里会根据是主动握手还是被动握手来进行执行对应的握手逻辑: //?filedir:go-ethereum-1.10.2\p2p\rlpx\rlpx.go???L253 func?(c?*Conn)?Handshake(prv?*ecdsa.PrivateKey)?(*ecdsa.PublicKey,?error)?{ ???var?( ???????sec?Secrets ???????err?error ??) ???if?c.dialDest?!=?nil?{???//主动握手 ???????sec,?err?=?initiatorEncHandshake(c.conn,?prv,?c.dialDest)??//主动发起秘钥验证握手结束,确定共享秘钥 ??}?else?{?//?被动握手 ???????sec,?err?=?receiverEncHandshake(c.conn,?prv) ??} ???if?err?!=?nil?{ ???????return?nil,?err ??} ???c.InitWithSecrets(sec) ???return?sec.remote,?err } 主动发起握手过程过程如下,在这里会调用 makeAuthMsg 来生成 Auth 身份信息,包含签名,随机nonce 生成的与签名对应的公钥和版本号,之后调用 sealEIP8 方法进行 rlpx编码,之后发起加密握手,之后接收返回的 authResp 消息,并验证解密,获取对方公钥,之后生成 AES,MAC: //?filedir:go-ethereum-1.10.2\p2p\rlpx\rlpx.go?L477 func?initiatorEncHandshake(conn?io.ReadWriter,?prv?*ecdsa.PrivateKey,?remote?*ecdsa.PublicKey)?(s?Secrets,?err?error)?{ ???h?:=?&encHandshake{initiator:?true,?remote:?ecies.ImportECDSAPublic(remote)} ???authMsg,?err?:=?h.makeAuthMsg(prv) ???if?err?!=?nil?{ ???????return?s,?err ??} ???authPacket,?err?:=?sealEIP8(authMsg,?h) ???if?err?!=?nil?{ ???????return?s,?err ??} ???if?_,?err?=?conn.Write(authPacket);?err?!=?nil?{ ???????return?s,?err ??} ???authRespMsg?:=?new(authRespV4) ???authRespPacket,?err?:=?readHandshakeMsg(authRespMsg,?encAuthRespLen,?prv,?conn) ???if?err?!=?nil?{ ???????return?s,?err ??} ???if?err?:=?h.handleAuthResp(authRespMsg);?err?!=?nil?{ ???????return?s,?err ??} ???return?h.secrets(authPacket,?authRespPacket) } receiverEncHandshake如下所示,和initiatorEncHandshake相差无几: func?receiverEncHandshake(conn?io.ReadWriter,?prv?*ecdsa.PrivateKey)?(s?Secrets,?err?error)?{ ???authMsg?:=?new(authMsgV4) ???authPacket,?err?:=?readHandshakeMsg(authMsg,?encAuthMsgLen,?prv,?conn) ???if?err?!=?nil?{ ???????return?s,?err ??} ???h?:=?new(encHandshake) ???if?err?:=?h.handleAuthMsg(authMsg,?prv);?err?!=?nil?{ ???????return?s,?err ??} ???authRespMsg,?err?:=?h.makeAuthResp() ???if?err?!=?nil?{ ???????return?s,?err ??} ???var?authRespPacket?[]byte ???if?authMsg.gotPlain?{ ???????authRespPacket,?err?=?authRespMsg.sealPlain(h) ??}?else?{ ???????authRespPacket,?err?=?sealEIP8(authRespMsg,?h) ??} ???if?err?!=?nil?{ ???????return?s,?err ??} ???if?_,?err?=?conn.Write(authRespPacket);?err?!=?nil?{ ???????return?s,?err ??} ???return?h.secrets(authPacket,?authRespPacket) } 之后通过 doProtoHandshake 来完成协议握手操作,在这里调用 send 发送一次握手操作,之后通过readProtocolHandshake 来读取返回信息,之后进行检查: func?(t?*rlpxTransport)?doProtoHandshake(our?*protoHandshake)?(their?*protoHandshake,?err?error)?{ werr?:=?make(chan?error,?1) ???go?func()?{?werr?<-?Send(t,?handshakeMsg,?our)?}()??? ???if?their,?err?=?readProtocolHandshake(t);?err?!=?nil?{ ???????<-werr?//?make?sure?the?write?terminates?too ???????return?nil,?err ??} ???if?err?:=?<-werr;?err?!=?nil?{ ???????return?nil,?fmt.Errorf("write?error:?%v",?err) ??} ???//?If?the?protocol?version?supports?Snappy?encoding,?upgrade?immediately ???t.conn.SetSnappy(their.Version?>=?snappyProtocolVersion) ???return?their,?nil } 服务循环 run 函数是服务的主循环,监听服务器终止、增加信任节点、移除信任节点、增加检查节点等: func?(srv?*Server)?run()?{ ???srv.log.Info("Started?P2P?networking",?"self",?srv.localnode.Node().URLv4()) ???defer?srv.loopWG.Done() ???defer?srv.nodedb.Close() ???defer?srv.discmix.Close() ???defer?srv.dialsched.stop() ???var?( ???????peers????????=?make(map[enode.ID]*Peer) ???????inboundCount?=?0 ???????trusted??????=?make(map[enode.ID]bool,?len(srv.TrustedNodes)) ??) for?_,?n?:=?range?srv.TrustedNodes?{ ???????trusted[n.ID()]?=?true ??} running: ???for?{ ???????select?{ ???????case?<-srv.quit: break?running ???????case?n?:=?<-srv.addtrusted: srv.log.Trace("Adding?trusted?node",?"node",?n) ???????????trusted[n.ID()]?=?true ???????????if?p,?ok?:=?peers[n.ID()];?ok?{ ???????????????p.rw.set(trustedConn,?true) ??????????} ???????case?n?:=?<-srv.removetrusted: srv.log.Trace("Removing?trusted?node",?"node",?n) ???????????delete(trusted,?n.ID()) ???????????if?p,?ok?:=?peers[n.ID()];?ok?{ ???????????????p.rw.set(trustedConn,?false) ??????????} ???????case?op?:=?<-srv.peerOp: op(peers) ???????????srv.peerOpDone?<-?struct{}{} ???????case?c?:=?<-srv.checkpointPostHandshake: if?trusted[c.node.ID()]?{ c.flags?|=?trustedConn ??????????} c.cont?<-?srv.postHandshakeChecks(peers,?inboundCount,?c) ???????case?c?:=?<-srv.checkpointAddPeer: err?:=?srv.addPeerChecks(peers,?inboundCount,?c) ???????????if?err?==?nil?{ p?:=?srv.launchPeer(c) ???????????????peers[c.node.ID()]?=?p ???????????????srv.log.Debug("Adding?p2p?peer",?"peercount",?len(peers),?"id",?p.ID(),?"conn",?c.flags,?"addr",?p.RemoteAddr(),?"name",?p.Name()) ???????????????srv.dialsched.peerAdded(c) ???????????????if?p.Inbound()?{ ???????????????????inboundCount++ ??????????????} ??????????} ???????????c.cont?<-?err ???????case?pd?:=?<-srv.delpeer: ???????????//?A?peer?disconnected. ???????????d?:=?common.PrettyDuration(mclock.Now()?-?pd.created) ???????????delete(peers,?pd.ID()) ???????????srv.log.Debug("Removing?p2p?peer",?"peercount",?len(peers),?"id",?pd.ID(),?"duration",?d,?"req",?pd.requested,?"err",?pd.err) ???????????srv.dialsched.peerRemoved(pd.rw) ???????????if?pd.Inbound()?{ ???????????????inboundCount-- ??????????} ??????} ??} ???srv.log.Trace("P2P?networking?is?spinning?down") if?srv.ntab?!=?nil?{ ???????srv.ntab.Close() ??} ???if?srv.DiscV5?!=?nil?{ ???????srv.DiscV5.Close() ??} for?_,?p?:=?range?peers?{ ???????p.Disconnect(DiscQuitting) ??} for?len(peers)?>?0?{ ???????p?:=?<-srv.delpeer ???????p.log.Trace("<-delpeer?(spindown)") ???????delete(peers,?p.ID()) ??} } 节点信息 NodeInfo 用于查看节点信息,PeersInfo 用于查看连接的节点信息: //?NodeInfo?gathers?and?returns?a?collection?of?metadata?known?about?the?host. func?(srv?*Server)?NodeInfo()?*NodeInfo?{ node?:=?srv.Self() ???info?:=?&NodeInfo{ ???????Name:???????srv.Name, ???????Enode:??????node.URLv4(), ???????ID:?????????node.ID().String(), ???????IP:?????????node.IP().String(), ???????ListenAddr:?srv.ListenAddr, ???????Protocols:??make(map[string]interface{}), ??} ???info.Ports.Discovery?=?node.UDP() ???info.Ports.Listener?=?node.TCP() ???info.ENR?=?node.String() for?_,?proto?:=?range?srv.Protocols?{ ???????if?_,?ok?:=?info.Protocols[proto.Name];?!ok?{ ???????????nodeInfo?:=?interface{}("unknown") ???????????if?query?:=?proto.NodeInfo;?query?!=?nil?{ ???????????????nodeInfo?=?proto.NodeInfo() ??????????} ???????????info.Protocols[proto.Name]?=?nodeInfo ??????} ??} ???return?info } func?(srv?*Server)?PeersInfo()?[]*PeerInfo?{ ???//?Gather?all?the?generic?and?sub-protocol?specific?infos ???infos?:=?make([]*PeerInfo,?0,?srv.PeerCount()) ???for?_,?peer?:=?range?srv.Peers()?{ ???????if?peer?!=?nil?{ ???????????infos?=?append(infos,?peer.Info()) ??????} ??} ???//?Sort?the?result?array?alphabetically?by?node?identifier ???for?i?:=?0;?i?<?len(infos);?i++?{ ???????for?j?:=?i?+?1;?j?<?len(infos);?j++?{ ???????????if?infos[i].ID?>?infos[j].ID?{ ???????????????infos[i],?infos[j]?=?infos[j],?infos[i] ??????????} ??????} ??} ???return?infos } 请求处理 下面为 peer.run 函数的代码: func?(p?*Peer)?run()?(remoteRequested?bool,?err?error)?{ ???var?( ???????writeStart?=?make(chan?struct{},?1) ???????writeErr???=?make(chan?error,?1) ???????readErr????=?make(chan?error,?1) ???????reason?????DiscReason?//?sent?to?the?peer ??) ???p.wg.Add(2) ???go?p.readLoop(readErr) ???go?p.pingLoop() ???//?Start?all?protocol?handlers. ???writeStart?<-?struct{}{} ???p.startProtocols(writeStart,?writeErr) ???//?Wait?for?an?error?or?disconnect. loop: ???for?{ ???????select?{ ???????case?err?=?<-writeErr: ???????????//?A?write?finished.?Allow?the?next?write?to?start?if ???????????//?there?was?no?error. ???????????if?err?!=?nil?{ ???????????????reason?=?DiscNetworkError ???????????????break?loop ??????????} ???????????writeStart?<-?struct{}{} ???????case?err?=?<-readErr: ???????????if?r,?ok?:=?err.(DiscReason);?ok?{ ???????????????remoteRequested?=?true ???????????????reason?=?r ??????????}?else?{ ???????????????reason?=?DiscNetworkError ??????????} ???????????break?loop ???????case?err?=?<-p.protoErr: ???????????reason?=?discReasonForError(err) ???????????break?loop ???????case?err?=?<-p.disc: ???????????reason?=?discReasonForError(err) ???????????break?loop ??????} ??} ???close(p.closed) ???p.rw.close(reason) ???p.wg.Wait() ???return?remoteRequested,?err } 从上述代码中可以看到函数的开头首先定义了一些局部变量,之后启用了两个协程,一个是 readLoop,它通过调用 ReadMsg() 读取msg,之后又通过调用 peer.handle(msg) 来处理msg。 如果 msg 是 pingMsg,则发送一个 pong 回应,如果 msg 与下述特殊情况不相匹配则将 msg 交给proto.in 通道,等待 protocolManager.handleMsg() 从通道中取出。另一个协程是 pingLoop,它主要通过调用 SendItems(p.rw, pingMsg) 来发起ping请求。 之后调用 starProtocols() 函数让协议运行起来: func?(p?*Peer)?startProtocols(writeStart?<-chan?struct{},?writeErr?chan<-?error)?{ ???p.wg.Add(len(p.running)) ???for?_,?proto?:=?range?p.running?{ ???????proto?:=?proto ???????proto.closed?=?p.closed ???????proto.wstart?=?writeStart ???????proto.werr?=?writeErr ???????var?rw?MsgReadWriter?=?proto ???????if?p.events?!=?nil?{ ???????????rw?=?newMsgEventer(rw,?p.events,?p.ID(),?proto.Name,?p.Info().Network.RemoteAddress,?p.Info().Network.LocalAddress) ??????} ???????p.log.Trace(fmt.Sprintf("Starting?protocol?%s/%d",?proto.Name,?proto.Version)) ???????go?func()?{ ???????????defer?p.wg.Done() ???????????err?:=?proto.Run(p,?rw) ???????????if?err?==?nil?{ ???????????????p.log.Trace(fmt.Sprintf("Protocol?%s/%d?returned",?proto.Name,?proto.Version)) ???????????????err?=?errProtocolReturned ??????????}?else?if?err?!=?io.EOF?{ ???????????????p.log.Trace(fmt.Sprintf("Protocol?%s/%d?failed",?proto.Name,?proto.Version),?"err",?err) ??????????} ???????????p.protoErr?<-?err ??????}() ??} } 最后通过一个 loop 循环来处理错误或者断开连接等操作: //?Wait?for?an?error?or?disconnect. loop: ???for?{ ???????select?{ ???????case?err?=?<-writeErr: ???????????//?A?write?finished.?Allow?the?next?write?to?start?if ???????????//?there?was?no?error. ???????????if?err?!=?nil?{ ???????????????reason?=?DiscNetworkError ???????????????break?loop ??????????} ???????????writeStart?<-?struct{}{} ???????case?err?=?<-readErr: ???????????if?r,?ok?:=?err.(DiscReason);?ok?{ ???????????????remoteRequested?=?true ???????????????reason?=?r ??????????}?else?{ ???????????????reason?=?DiscNetworkError ??????????} ???????????break?loop ???????case?err?=?<-p.protoErr: ???????????reason?=?discReasonForError(err) ???????????break?loop ???????case?err?=?<-p.disc: ???????????reason?=?discReasonForError(err) ???????????break?loop ??????} ??} ???close(p.closed) ???p.rw.close(reason) ???p.wg.Wait() ???return?remoteRequested,?err 创数据库 newPersistentDB 函数用于创建一个持久化的数据库用于存储节点信息: //?filedir:go-ethereum-1.10.2\p2p\enode\nodedb.go?L95 //?newPersistentNodeDB?creates/opens?a?leveldb?backed?persistent?node?database, //?also?flushing?its?contents?in?case?of?a?version?mismatch. func?newPersistentDB(path?string)?(*DB,?error)?{ ???opts?:=?&opt.Options{OpenFilesCacheCapacity:?5} ???db,?err?:=?leveldb.OpenFile(path,?opts) ???if?_,?iscorrupted?:=?err.(*errors.ErrCorrupted);?iscorrupted?{ ???????db,?err?=?leveldb.RecoverFile(path,?nil) ??} ???if?err?!=?nil?{ ???????return?nil,?err ??} currentVer?:=?make([]byte,?binary.MaxVarintLen64) ???currentVer?=?currentVer[:binary.PutVarint(currentVer,?int64(dbVersion))] ???blob,?err?:=?db.Get([]byte(dbVersionKey),?nil) ???switch?err?{ ???case?leveldb.ErrNotFound: ???????//?Version?not?found?(i.e.?empty?cache),?insert?it ???????if?err?:=?db.Put([]byte(dbVersionKey),?currentVer,?nil);?err?!=?nil?{ ???????????db.Close() ???????????return?nil,?err ??????} ???case?nil: ???????//?Version?present,?flush?if?different ???????if?!bytes.Equal(blob,?currentVer)?{ ???????????db.Close() ???????????if?err?=?os.RemoveAll(path);?err?!=?nil?{ ???????????????return?nil,?err ??????????} ???????????return?newPersistentDB(path) ??????} ??} ???return?&DB{lvl:?db,?quit:?make(chan?struct{})},?nil } 节点超时 ensureExpirer 函数用于检查节点是否超时,具体实现代码如下所示: func?(db?*DB)?ensureExpirer()?{ ???db.runner.Do(func()?{?go?db.expirer()?}) } func?(db?*DB)?expirer()?{ ???tick?:=?time.NewTicker(dbCleanupCycle) ???defer?tick.Stop() ???for?{ ???????select?{ ???????case?<-tick.C: ???????????db.expireNodes() ???????case?<-db.quit: ???????????return ??????} ??} } func?(db?*DB)?expireNodes()?{ ???it?:=?db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)),?nil) ???defer?it.Release() ???if?!it.Next()?{ ???????return ??} ???var?( ???????threshold????=?time.Now().Add(-dbNodeExpiration).Unix() ???????youngestPong?int64 ???????atEnd????????=?false ??) ???for?!atEnd?{ ???????id,?ip,?field?:=?splitNodeItemKey(it.Key()) ???????if?field?==?dbNodePong?{ ???????????time,?_?:=?binary.Varint(it.Value()) ???????????if?time?>?youngestPong?{ ???????????????youngestPong?=?time ??????????} ???????????if?time?<?threshold?{ ???????????????//?Last?pong?from?this?IP?older?than?threshold,?remove?fields?belonging?to?it. ???????????????deleteRange(db.lvl,?nodeItemKey(id,?ip,?"")) ??????????} ??????} ???????atEnd?=?!it.Next() ???????nextID,?_?:=?splitNodeKey(it.Key()) ???????if?atEnd?||?nextID?!=?id?{ if?youngestPong?>?0?&&?youngestPong?<?threshold?{ ???????????????deleteRange(db.lvl,?nodeKey(id)) ??????????} ???????????youngestPong?=?0 ??????} ??} } 状态更新 下面是一些状态更新函数: //?LastPingReceived?retrieves?the?time?of?the?last?ping?packet?received?from //?a?remote?node. func?(db?*DB)?LastPingReceived(id?ID,?ip?net.IP)?time.Time?{ ???if?ip?=?ip.To16();?ip?==?nil?{ ???????return?time.Time{} ??} ???return?time.Unix(db.fetchInt64(nodeItemKey(id,?ip,?dbNodePing)),?0) } //?UpdateLastPingReceived?updates?the?last?time?we?tried?contacting?a?remote?node. func?(db?*DB)?UpdateLastPingReceived(id?ID,?ip?net.IP,?instance?time.Time)?error?{ ???if?ip?=?ip.To16();?ip?==?nil?{ ???????return?errInvalidIP ??} ???return?db.storeInt64(nodeItemKey(id,?ip,?dbNodePing),?instance.Unix()) } //?LastPongReceived?retrieves?the?time?of?the?last?successful?pong?from?remote?node. func?(db?*DB)?LastPongReceived(id?ID,?ip?net.IP)?time.Time?{ ???if?ip?=?ip.To16();?ip?==?nil?{ ???????return?time.Time{} ??} ???//?Launch?expirer ???db.ensureExpirer() ???return?time.Unix(db.fetchInt64(nodeItemKey(id,?ip,?dbNodePong)),?0) } //?UpdateLastPongReceived?updates?the?last?pong?time?of?a?node. func?(db?*DB)?UpdateLastPongReceived(id?ID,?ip?net.IP,?instance?time.Time)?error?{ ???if?ip?=?ip.To16();?ip?==?nil?{ ???????return?errInvalidIP ??} ???return?db.storeInt64(nodeItemKey(id,?ip,?dbNodePong),?instance.Unix()) } //?FindFails?retrieves?the?number?of?findnode?failures?since?bonding. func?(db?*DB)?FindFails(id?ID,?ip?net.IP)?int?{ ???if?ip?=?ip.To16();?ip?==?nil?{ ???????return?0 ??} ???return?int(db.fetchInt64(nodeItemKey(id,?ip,?dbNodeFindFails))) } //?UpdateFindFails?updates?the?number?of?findnode?failures?since?bonding. func?(db?*DB)?UpdateFindFails(id?ID,?ip?net.IP,?fails?int)?error?{ ???if?ip?=?ip.To16();?ip?==?nil?{ ???????return?errInvalidIP ??} ???return?db.storeInt64(nodeItemKey(id,?ip,?dbNodeFindFails),?int64(fails)) } //?FindFailsV5?retrieves?the?discv5?findnode?failure?counter. func?(db?*DB)?FindFailsV5(id?ID,?ip?net.IP)?int?{ ???if?ip?=?ip.To16();?ip?==?nil?{ ???????return?0 ??} ???return?int(db.fetchInt64(v5Key(id,?ip,?dbNodeFindFails))) } //?UpdateFindFailsV5?stores?the?discv5?findnode?failure?counter. func?(db?*DB)?UpdateFindFailsV5(id?ID,?ip?net.IP,?fails?int)?error?{ ???if?ip?=?ip.To16();?ip?==?nil?{ ???????return?errInvalidIP ??} ???return?db.storeInt64(v5Key(id,?ip,?dbNodeFindFails),?int64(fails)) } 节点挑选 QuerySeeds 函数用于从数据库里面随机挑选合适种子节点: //?QuerySeeds?retrieves?random?nodes?to?be?used?as?potential?seed?nodes //?for?bootstrapping. func?(db?*DB)?QuerySeeds(n?int,?maxAge?time.Duration)?[]*Node?{ ???var?( ???????now???=?time.Now() ???????nodes?=?make([]*Node,?0,?n) ???????it????=?db.lvl.NewIterator(nil,?nil) ???????id????ID ??) ???defer?it.Release() seek: ???for?seeks?:=?0;?len(nodes)?<?n?&&?seeks?<?n*5;?seeks++?{ ctr?:=?id[0] ???????rand.Read(id[:]) ???????id[0]?=?ctr?+?id[0]%16 ???????it.Seek(nodeKey(id)) ???????n?:=?nextNode(it) ???????if?n?==?nil?{ ???????????id[0]?=?0 ???????????continue?seek?//?iterator?exhausted ??????} ???????if?now.Sub(db.LastPongReceived(n.ID(),?n.IP()))?>?maxAge?{ ???????????continue?seek ??????} ???????for?i?:=?range?nodes?{ ???????????if?nodes[i].ID()?==?n.ID()?{ ???????????????continue?seek?//?duplicate ??????????} ??????} ???????nodes?=?append(nodes,?n) ??} ???return?nodes } func?nextNode(it?iterator.Iterator)?*Node?{ ???for?end?:=?false;?!end;?end?=?!it.Next()?{ ???????id,?rest?:=?splitNodeKey(it.Key()) ???????if?string(rest)?!=?dbDiscoverRoot?{ ???????????continue ??????} ???????return?mustDecodeNode(id[:],?it.Value()) ??} ???return?nil } 文末小结 P2P网络是区块链分布式网络结构的基础,本篇文章详细介绍了P2P网络的基本原理,包括节点发现机制、分布式哈希表、节点查找、节点新增、节点移除、请求处理等,同时从源码角度对以太坊源码中P2P网络的实现做了较为细致的分析,探索了以太坊P2P网络的工作流程以以及安全设计,而公链安全体系的建设依旧是长路漫漫,有待进一步深入探索。 查看更多 —- 编译者/作者:sky110 玩币族申明:玩币族作为开放的资讯翻译/分享平台,所提供的所有资讯仅代表作者个人观点,与玩币族平台立场无关,且不构成任何投资理财建议。文章版权归原作者所有。 |
深入理解以太坊P2P网络设计(下)
2021-09-24 sky110 来源:区块链网络
LOADING...
相关阅读:
- 1079:为什么没能拿住比特币和以太坊?2021-09-24
- Tezos 进入阻力区; XTZ能否延续之前的涨幅?2021-09-24
- Optimism 公布重要更新计划:除了兼容 EVM,还有什么?2021-09-24
- 币圈王哥:2021.9.24晚间多币种走势分析2021-09-24
- 选择平台:以太坊还是波卡?两者有哪些关键差异点?2021-09-24