在上一篇文章中,我们指出在 builder/builder.js 文件中调用调用 pull 函数进行保存文件,这篇文章我们就来详细研究下这个过程。 1,设置源流为 file.content。 2,调用 chunker 流,对保存的内容进行分块。通过前面的文章,我们知道 chunker 流的默认实现为 chunker/fixed-size.js,它是一个 pull-through 流。 这个流提供了两个函数,分别称为 onData 和 onEnd,前者在每次数据到来时调用,后者当数据发送完成时调用。fixed-size.js 在初始化时,根据选项中指定的maxChunkSize 属性设置每一个区块的大小。 下面,我们来看下它的在它的 onData 和 onEnd 两个方法。 onData 函数处理如下: 以上就 IPFS 中固定分块的逻辑,其实也很简单。 从缓冲列表中取得规定的区块大小数据到队列中。 this.queue(bl.slice(0,maxSize)) 如果缓冲列表的长度刚好等于规定的区块大小,那么重新一个新的缓冲区列表,并将当前数据长度设置为 0; 否则,生成一个新的缓冲区列表,并从老缓冲区中区块大小处把数据读取到新的缓冲区列表中(0 到区块大小处的数据已经在上面读取过) 同时设置其为老的缓冲区列表,并更新当前数据长度设减去前一步读取到区块大小长度,从而更缓冲区列表及其长度。 if(maxSize===bl.length){ bl=newBufferList() currentLength=0 }else{ onstnewBl=newBufferList() newBl.append(bl.shallowSlice(maxSize)) bl=newBl currentLength-=maxSize} 每次收到数据之后就保存在 BufferList 中,同时把当前数据长度也加上读取到数据的长度。 bl.append(buffer) currentLength += buffer.lengt 如果当前数据长度大于等于规定的区块大小时,那么就进行下面的循环处理,直到当前数据长度小于规定的区块大小。 看完了 onData 方法,接下来我们再看 onEnd 函数,这个函数首先检查缓冲列表中是否有数据(少于区块大小),如果有则同样保存到队列中。 if(currentLength) {this.queue(bl.slice(0,currentLength)) emitted=true} if(!emitted) { this.queue(Buffer.alloc(0))} this.queue(null) 调用 paraMap 流(类型为 pull-paramap),对每一个分块进行处理。 当前面的流对文件进行分块之后,每一个分区都会下一个流进行拉取,在这里就是这个函数,我们看下这个函数是如何处理每一个分块的。 它的主体是一个 waterfall 函数,这个函数正如其名字所示,每一个函数都进行各自的处理,并把结果传递给下一个函数,我们看下它的几个处理函数。 首先,我们来看第一个函数,它主要用来创建 DAGNode,并把相关信息传递给第二个函数,它的执行逻辑如下: 接下来,我们看第二个函数,它的主要作用是把生成的 DAGNode 保存到系统中,并把保存的结果传递给下一个函数,它的执行逻辑如下: 从选项中获取 CID 版本号、哈希算法、编码方式等。 letcidVersion=options.cidVersion||defaultOptions.cidVersion lethashAlg=options.hashAlg||defaultOptions.hashAlg letcodec=options.codec||defaultOptions.codec if(Buffer.isBuffer(node)){ cidVersion=1 codec='raw'} if(hashAlg!=='sha2-256'){ cidVersion=1} 默认情况下,版本号为0,哈希算法为 SHA256,编码方式为 dag-pb,这是一种基于 Protocol 规定的 JS 实现。 如果选项中指定不保存而仅仅是计算哈希值,那么调用 ipld-dag-pb 库中的 util.js 中的 cid 函数,获取 DAG 节点的 CID,然后直接返回。 if(options.onlyHash){ returncid(node,{ version:cidVersion, hashAlg:hashAlg}, (err,cid)=>{ callback(err,{cid, node})})} 如果不是只计算哈希,那么调用 IPLD 对象的 put 来保存 DAG 节点。 ipld.put(node,{ version:cidVersion, hashAlg:hashAlg, format:codec}, (error,cid)=>{ callback(error, {cid, node }) }) IPLD 对象定义于 ipld 库中。 IPLD 在 IPFS 中具有非常重要的作用,它是 InterPlanetary Linked-Data 的缩写,代表了 IPFS 的野心与希望,把一切东西连结起来的愿望. 目前可以边结比特币、以太坊、Zcash、git 等。它持有 ipfs-block-service,后者又持有 ipfs 仓库对象和 bitswap 对象,这几个对象构成了 ipfs 的核心。 下面我们来看 put 方法,看它是怎么来保存 DAG 对象的。 它的主体是调用内部方法获取当前 DAG 对象编码用的格式,然后使用与这种格式相匹配的 cid 方法来取得对象的 CID 对象,然后调用内部的_put 来保存数据。 this._getFormat(options.format,(err,format)=>{ if(err)returncallback(err) format.util.cid(node,options,(err,cid)=>{ if(err){ returncallback(err)} if(options.onlyHash){ returncallback(null,cid)} this._put(cid,node,callback) }) }) 接下来,我们来看这个内部_put 方法,这个方法主体是一个 waterfall 函数,它内部的几个函数分别根据 CID 对象获得对应的编码格式,然后使用编码格式对应的方法序列化 DAG 节点对象,最后生成区块 Block 对象,并调用区块服务对象的 put 方法来保存区块。 区块服务对象定义于 ipfs-block-service 库,它的 put 方法,根据是否有 bitswap 对象(初始化是这个对象为空)来决定是调用仓库对象来保存区块,还是调用 bitswap 来保存区块。对于我们的例子来说,它会调用 bitswap 来保存区块。 bitswap 对象的 put 方法,不仅会把区块保存在底层的 blockstore 中,还会把它发送给那些需要它的节点。它的主体是一个 waterfall 函数,其中第一个函数检查本地区块存储是否有这个区块,第二个根据本地是否有这个区块来确定是否忽略调用,还是真正来保存区块。 waterfall([ (cb)=>this.blockstore.has(block.cid,cb), (has,cb)=>{ if(has){ returnnextTick(cb) } this._putBlock(block,cb) } ],callback) bitswap 对象的_putBlock 方法调用区块存储对象的 put 方法在本地仓库中保存区块对象,并在成功之后触发一个收到区块的事件. 同时通过网络对象的 provide 方法,从而把 CID 保存在最近的节点中,然后调用引擎对象的 receivedBlocks 方法,把接收到的区块对象发送到所有想要这个区块的所有节点中。 this.blockstore.put(block,(err)=>{ if(err){ returncallback(err) } this.notifications.hasBlock(block) this.network.provide(block.cid,(err)=>{ if(err){ this._log.error('Failedtoprovide:%s',err.message) }}) this.engine.receivedBlocks([block.cid]) callback() }) bitswap 对象中有两个重要的对象,一个是网络对象,一个是引擎对象。 网络对象的 provide 方法直接调用 libp2p 对象的内容路由的同名方法来处理区块的 CID。 libp2p 对象的内容路由中保存所有具体的路由方法,默认情况下,是空的,即没有任何路由方法。 而我们通过在配置文件中,指定 libp2p.config.dht.enabled 为真,为内容路由指定了 DHT 路由。 所以最终区块的 CID 会被保存在最合适的节点中。 网络对象在初始方法中,指定了自身的两个方法作为 libp2p 对象的节点连接与断开事件的处理器,从而在连接与断开时获得相应的通知。 并且还调用了 libp2p 对象的 handle 方法,从而使自己成为 libp2p 对象/ipfs/bitswap/1.0.0和/ipfs/bitswap/1.1.0这两种协义的处理对象,从而当 libp2p 收到这两种消息时,会调用网络对象对象的相应方法进行处理。 网络对象处理 bitswap 协义是通过 pull 函数处理的,大致流程如下:从连接对象中获取消息,反序列化成为消息对象,然后通过连接对象获取它的节点信息对象。 调用 bitswap 对象的内部方法_receiveMessage 处理传递进来的消息,而这个方法又会调用引擎对象的 messageReceived 方法来处理接收到的消息。 引擎对象的 messageReceived 方法的大致流程如下: 1)调用内部方法_findOrCreate,找到或创建远程对等节点的总账本对象 Ledger,如果是新创建的总账本对象,还要放入内部映射集合中,key 为远程对等节点的 Base58 字符串; 2)如果这个消息是完全的消息,则生成一个新的想要请求列表。 3)调用内部方法_processBlocks,处理消息中的区块对象。 4)如果消息中的想要列表为空的,则退出方法。 5)遍历消息中的想要列表,如果当前想要的实体被取消,则从对应的节点的总账本中去掉对应项,同时保存在取消项列表中;否则,把当前项保存在对应节点的总账本中,同时保存在想要列表中。 6)调用内部方法_cancelWants ,把任务中已经取消的过滤掉,即删除任务中已经取消的任务。 7)调用内部方法_addWants,处理远程对等节点所有想要的列表。调用区块存储对象判断想要的项本地仓库中是否已经有,如果已经有,则生成相应的任务。 引擎对象的 receivedBlocks 方法在收到具体区块时,检查所有已连接的远程节点(总账本对象),看它们是否想要这个区块,如果是则生成一个任务,在后台进行处理。 调用 persist 方法,保存 DAG 节点。这是非常重要的一步,它不仅把区块对象保存在本地仓库,也涉及与是否把区块 CID 保存在与它最近的节点上,还涉及到把区块通过 bitswap 协义发送到那些想要它的节点中。它的执行如下: 生成一个 UnixFS 对象。 constfile=newUnixFS(options.leafType,buffer) UnixFS 是一种基于协议缓冲区的格式,用于描述IPFS中的文件,目录和符号链接。目前它支持:原始数据、目录、文件、原数据、符号连接、hamt-sharded-directory 等几种类型。 leafType 默认为文件,在文件初始化时通过默认选项 defaultOptions 指定的。 调用 DAGNode.create 静态方法,创建 DAGNode 节点,成功之后,把相信信息传递下一个函数。 DAGNode.create(file.marshal(),[],(err,node)=>{ if(err){ returncb(err) } cb(null,{size:node.size, leafSize:file.fileSize(), data:node})}) UnixFS 的 marshal 方法主要内容是对文件内容(字节缓冲区)进行编码。 这里 DAGNode 引用的是 ipld-dag-pb 库中的 dag-node/index.js 中定义的 DAGNode 函数对象. 它的 create 方法,定义于同一个目录下的 create.js 中,我们来看下这个方法。 它的主要内容是对文件的分区数据和对其他区块的连接 link 进行检查,并把两者序列后之后再创建 DAGNode 对象。 而后者的构造函数比较简单,仅把区块的数据及与其他区块的连接(代表与其他区块的关系)保存起来。 调用 pullThrough 流(类型为 pull-through 流),对收到的每个数据进行处理。这个过程比较简单,这里不细讲。 调用 reducer 流,把所有生成的分块进行归一处理。在默认情况下,reducer 流是在 balanced/index.js中通过调用 balanced/balanced-reducer.js 中的 balancedReduceToRoot 的函数生成的。 我们看下这个函数的执行过程: 第一个函数是前面建立的 source 流。 第二个函数是一个 pull-batch 类库定义的流,这是一个 pull-through 流,它实现了自己的 writer、ender 两个函数,它把每次获取到的数据保存在内部数组中,达到一定程序之后才会保存到 pull-through 流的队列中。 第三个函数是 pull-stream 类库的 async-map 流,这是一个 through 流,与 map 流相似,但有更好的性能。 它的归一处理函数 reduce 默认情况下为 builder/reduce.js 中返回的 reducefile 函数。 它的流程如下:1)如果当前叶子节点数量是1,并且其 single 标志为真,并且选项中有配置把单独叶子归一到自身,那么直接调用回调对象; 否则,执行下面的流。 if(leaves.length===1&&leaves[0].single&&options.reduceSingleLeafToSelf){ constleaf=leaves[0] returncallback(null,{ size:leaf.size, leafSize:leaf.leafSize, multihash:leaf.multihash, path:file.path, name:leaf.name})} 创建父节点,并添加它的所有叶子节点。当文件比较大的时候,IPFS 会进行分块,每一个分块就构成了这里的叶子节点. 最终这些叶子按照它们分块的顺序,生成对应的 DAGLink ,然后依次添加到父 DAGNode 中. 这时候父 DAGNode 保存的不是文件内容,而是这些叶子节点的 DAGLink,从而构成文件的完整内容。 constf=newUnixFS('file') constlinks=leaves.map((leaf)=>{ f.addBlockSize(leaf.leafSize) returnnewDAGLink(leaf.name,leaf.size,leaf.multihash)}) 调用 waterfall 函数,顺序处理父节点。这个地方和处理单个分块类似,就是创建 DAGNode 对象、调用 persist 函数进行持久化处理。 注意:这里的区别是父节点有叶子节点,即 links 不空。 waterfall([ (cb)=>DAGNode.create(f.marshal(),links,cb), (node,cb)=>persist(node,ipld,options,cb) ],(error,result)=>{ if(error){ returncallback(error)} callback(null,{ size:result.node.size, leafSize:f.fileSize(), multihash:result.cid.buffer, path:file.path, name:''})}) 上面 waterfall 函数处理完成后,调用回调函数进行继续处理。 归一处理函数 reduce 中的回调函数是下面 collect 流即 sink 流中的读取回调函数,当归一函数读取到数据之后,调用这个回调函数,从而数据 pull 到 collect 流,进而进入 reduced 函数中进行处理。 第四个函数是 pull-stream 类库的 collect 流,这是一个 sink 流。它的处理函数 reduced 流程如下:1)如果前面的流有错误,则直接调用 reduceToParents 函数的回调函数进行处理; 否则,如果当前收到的数据长度大于1,即前面归一处理之后,还是有多个根 DAGNode,则调用 reduceToParents 函数继续进行归一处理; 否则,调用 reduceToParents 函数的回调函数进行处理。 reduceToParents 函数的回调函数,这是一个很关键的函数,在这个函数内部把读取到的数据写入 result 表示的 pull-pushable 流,以便在它后面的外部流流获取数据。 生成 pull-pair 对象和 pull-pushable 对象。 constpair=pullPair() constsource=pair.source constresult=pushable() 调用 reduceToParents 函数,建立内部 pull 流。函数的主体就是一个 pull 函数建立起来的流,它的几个函数如下: 返回双向流对象。这里返回的双向流对象为 {sink:pair.sink, source:result} 其中 sink 是 pull-pair 类库中定义的 sink 流,它被外部的 pull 函数调用用来从前面一个流中读取数据;source 是 pull-pushable 类库中的流。 在 reduceToParents 函数的回调函数中被 push 数据,从而外部的 pull 函数中相关的流可以从它中读取函数。 调用 collect 流,在这个流的处理函数中,把保存文件的结果传递到外部函数中。 collect((err,roots)=>{ if(err){ callback(err) }else{ callback(null,roots[0])} }) 这里的 callback 是调用 createAndStoreFile 函数时传递进来的,而它的调用是在 builer/builder.js文件中,简单回顾一下调用代码: createAndStoreFile(item,(err,node)=>{ if(err){returncb(err)} if(node){ source.push(node)} cb()}) 这里的匿名回调函数即是上面的 callback,在回调函数中,通过保存文件的结果写入 source 流中,从而把数据传递到更外层的 pull 流中。 到这里,我们已经把保存文件/内容这一核心流程完整分析了一遍,从头看到尾的你是不是收获很大。接下来,敬请期待下篇获取内容。 —- 编译者/作者:星鉴网 玩币族申明:玩币族作为开放的资讯翻译/分享平台,所提供的所有资讯仅代表作者个人观点,与玩币族平台立场无关,且不构成任何投资理财建议。文章版权归原作者所有。 |
精通IPFS:IPFS 保存内容之下篇
2020-06-10 星鉴网 来源:区块链网络
LOADING...
相关阅读:
- 第630篇:李笑来说Defi最近还得出事……2020-08-01
- SEC将监控合同授予Ciphertrace,美国监管机构开始涉足Binance链2020-08-01
- 读币圈头条拿3000key红包||5张图来看以太坊的发展史。2020-08-01
- 以太坊的五年:从少年梦到市值380亿美元区块链2020-08-01
- 区块链板块大热2020-08-01