经过前面的分析,我们已经明白了 IPFS 启动过程,从今天起,我会分析一些常见的命令或动作,希望大家喜欢。 在开始真正分析这些命令/动作之前,先要对 pull-stream 类库进行简单介绍,如果不熟悉这个类库,接下来就没办法进行。 今天,我们看下第一个最常用的add命令/动作,我们使用 IPFS 就是为了把文件保存到 IPFS,自然少不了保存操作,add命令就是干这个的,闲话少数,我们来看一段代码。 const{createNode}=require('ipfs') constnode=createNode({ libp2p:{ config:{ dht:{ enabled:true } } } }) node.on('ready',async()=>{ constcontent=`我爱黑萤`; constfilesAdded=awaitnode.add({ content:Buffer.from(content) },{ chunkerOptions:{ maxChunkSize:1000, avgChunkSize:1000 } }) console.log('Addedfile:',filesAdded[0].path,filesAdded[0].hash) }) 这次我们没有完全使用默认配置,开启了 DHT,看过我文章的读者都知道 DHT 是什么东东,这里不详细解释。在程序中,通过调用 IPFS 节点的add方法来上传内容,内容可以是文件,也可以是直接的内容,两者有稍微的区别,在讲到相关代码时,我们指出这种区别的,这里我们为了简单直接上传内容为例来说明。 add方法位于core/components/files-regular/add.js文件中,在 《精通IPFS:系统启动之概览》 那篇文章中,我们说过,系统会把core/components/files-regular目录下的所有文件扩展到 IPFS 对象上面,这其中自然包括这里的add.js文件。下面,我们直接看这个函数的执行流程。 这个函数返回了一个内部定义的函数,在这个内部定义的函数中对参数做了一些处理,然后就调用内部的add函数,后者才是主体,它的逻辑如下: 首先,检查选项对象是否为函数,如果是,则重新生成相关的变量。if(typeofoptions==='function'){ callback=options options={} } 定义检测内容的工具函数来检测我们要上传的内容。constisBufferOrStream=obj=>Buffer.isBuffer(obj)||isStream.readable(obj)||isSource(obj) constisContentObject=obj=>{ if(typeofobj!=='object')returnfalse if(obj.content)returnisBufferOrStream(obj.content) returnBoolean(obj.path)&&typeofobj.path==='string' } constisInput=obj=>isBufferOrStream(obj)||isContentObject(obj) constok=isInput(data)||(Array.isArray(data)&&data.every(isInput)) if(!ok){ returncallback(newError('invalidinput:expectedbuffer,readablestream,pullstream,objectorarrayofobjects')) } 接下来,执行 pull-stream 类库提供的pull函数。我们来看pull函数的主要内容。它的第一个参数是pull.values函数执行的结果,这个values函数就是一个 source 流,它返回一个称为read的函数来读取我们提供的数据。这个read函数从数组中读取当前索引位置的值,以此值为参数,调用它之后的 through 函数第二层函数内部定义的回调函数或最终的 sink 函数内部定义的回调函数。如果数组已经读取完成,则直接以 true 为参数进行调用。第二个参数是 IPFS 对象的addPullStream方法,这个方法也是在启动时候使用同样的方法扩展到 IPFS 对象,它的主体是当前目录的add-pull-stream.js文件中的函数。接下来,我们会详细看这个函数,现在我们只需要知道这个函数返回了一个部分化的管道。 第三个参数是pull-sort中定义的函数,这是一个依赖于pull-stream的库,根据一定规则来排序,这个函数我们不用管。 最后一个参数是pull.collect函数执行的结果,这个collect函数就是一个 sink 流。它把最终的结果放入一个数组中,然后调用回调函数。我们在前面代码中看到的filesAdded之所以是一个数组就是拜这个函数所赐。 上面逻辑的代码如下: pull( pull.values([data]), self.addPullStream(options), sort((a,b)=>{ if(a.path<b.path)return1 if(a.path>b.path)return-1 return0 }), pull.collect(callback) ) 在上面的代码中,我们把要保存的内容构成一个数组,具体原因下面解释。现在,我们来看addPullStream方法,这个方法是保存内容的主体,add方法是只开胃小菜。addPullStream方法执行逻辑如下: 调用parseChunkerString函数,处理内容分块相关的选项。这个函数位于相同目录下的utils.js文件中,它检查用户指定的分块算法。如果用户没有指定,则使用固定分块算法,大小为系统默认的 262144;如果指定了大小,则使用固定分块算法,但大小为用户指定大小;如果指定为rabin类分割法,即变长分割法,则调用内部函数来生成对应的分割选项。上面逻辑代码如下:parseChunkerString=(chunker)=>{ if(!chunker){ return{ chunker:'fixed' } }elseif(chunker.startsWith('size-')){ constsizeStr=chunker.split('-')[1] constsize=parseInt(sizeStr) if(isNaN(size)){ thrownewError('Chunkerparametersizemustbeaninteger') } return{ chunker:'fixed', chunkerOptions:{ maxChunkSize:size } } }elseif(chunker.startsWith('rabin')){ return{ chunker:'rabin', chunkerOptions:parseRabinString(chunker) } }else{ thrownewError(Unrecognizedchunkeroption:${chunker}) } } 注意:我们也可以通过重写这个函数来增加自己的分割算法。合并整理选项变量。constopts=Object.assign({},{ shardSplitThreshold:self._options.EXPERIMENTAL.sharding ?1000 :Infinity },options,chunkerOptions) 设置默认的 CID 版本号。如果指定了 Hash 算法,但是 CID 版本又不是 1,则强制设为 1。CID 是分布式系统的自描述内容寻址标识符,目前有两个版本 0 和 1,版本 0 是一个向后兼容的版本,只支持 sha256 哈希算法,并且不能指定。if(opts.hashAlg&&opts.cidVersion!==1){ opts.cidVersion=1 } 设置进度处理函数,默认空实现。constprog=opts.progress||noop constprogress=(bytes)=>{ total+=bytes prog(total) } opts.progress=progress 用pull函数返回一个部分化的 pull-stream 流。这个部分化的 pull-stream 流是处理文件/内容保存的关键,我们仔细研究下。首先调用pull.map方法对保存的内容进行处理。pull.map方法是 pull-stream 流中的一个 source 流,它对数组中的每个元素使用指定的处理函数进行处理。这就是我们在add函数中把需要保存的内容转化为数组的原因。在这里,对每个数组元素进行处理的函数是normalizeContent。这个函数定义在同一个文件中,它首先检查保存的内容是否为数组,如果不是则转化为数组;然后,对数组中的每一个元素进行处理,具体如下:如果保存的内容是 Buffer 对象,则把要保存的内容转化为路径为空字符串,内容为 pull-stream 流的对象。if(Buffer.isBuffer(data)){ data={path:'',content:pull.values([data])} } 如果保存的内容是一个 Node.js 可读流,比如文件,则把要保存的转化为路径为空字符串,内容使用 stream-to-pull-stream 类的source方法库把 Node.js 可读流转化为 pull-stream 的 source 流对象。if(isStream.readable(data)){ data={path:'',content:toPull.source(data)} } 如果保存的内容是 pull-stream 的 source 流,则把要保存的内容转化为路径为空字符串,内容不变的对象。if(isSource(data)){ data={path:'',content:data} } 如果要保存的内容是一个对象,并且content属性存在,且不是函数,则进行如下处理:if(data&&data.content&&typeofdata.content!=='function'){ if(Buffer.isBuffer(data.content)){ data.content=pull.values([data.content]) } if(isStream.readable(data.content)){ data.content=toPull.source(data.content) } } 如果指定的是路径,则进行下面的处理。if(opts.wrapWithDirectory&&!data.path){ thrownewError('Mustprovideapathwhenwrappingwithadirectory') } if(opts.wrapWithDirectory){ data.path=WRAPPER+data.path } 返回最终生成的要保存的内容。调用pull.flatten()方法,把前上步生成的数组进行扁平化处理。flatten方法是一个 through 流,主要是把多个流或数组流转换为一个流,比如把多个数组转换成一个数组,比如:[ [1,2,3], [4,5,6], [7,8,9] ] 这样的数组使用这个方法处理后,最终会变成下面的数组[1,2,3,4,5,6,7,8,9] 调用importer函数来保存内容。这个函数定义在ipfs-unixfs-importer类库中,这个类库是 IPFS 用于处理文件的布局和分块机制的 JavaScript 实现,具体如何保存内容,如何分块我们将在下篇文章中进行详细分析。调用pull.asyncMap方法,对已经保存的文件/内容进行预处理,生成用户看到的内容。当程序执行到这里时,我们要保存的文件或内容已经保存在本地 IPFS 仓库,已经可以用使用cat、get、ls等命令来 API 来查看我们保存的内容或文件了。asyncMap方法是一个 through 流,类似于map流,但是有更好的性能。它会对每一个数组元素进行处理,这里处理函数为prepareFile。这个函数定义在同一个文件中,它的处理具体如下: 使用已经生成文件的multihash内容生成 CID 对象。letcid=newCID(file.multihash) CID 构造方法会检查传入的参数,如果是 CID 对象,则直接从对象中取出版本号、编码方式、多哈希等属性;如果是字符串,则又分为是否被 multibase 编码过,如果是则需要先解码,然后再分离出各种属性,如果没有经过 multibase 编码,那么肯定是 base58 字符串,则设置版本为0,编码方式为dag-pb,再从 base58 串中获取多哈希值;如果是缓冲对象,则取得第一个字节,并按十六进制转化成整数,如果第一个字节是 0或1,则生成各自属性,否则为多哈希,则设置版本为0,编码方式为dag-pb。如果用户指定 CID 版本为 1,则生成 CID 对象到版本1.if(opts.cidVersion===1){ cid=cid.toV1() } 接下来,调用waterfall方法,顺序处理它指定的函数。第一个函数,检查配置选项是否指定了onlyHash,即不实际地上传文件到IFS网络,仅仅计算一下这个文件的 HASH,那么直接调用第二个函数,否则,调用 IPFS 对象的object.get方法来获取指定文件在仓库中保存的节点信息。这个方法我们后面会专门讲解,这里略去不讲。第二个函数,生成最终返回给用户的对象,这个对象包括了:path、size、hash 等。上面代码如下,比较简单,可自己阅读。 waterfall([ (cb)=>opts.onlyHash ?cb(null,file) :self.object.get(file.multihash,Object.assign({},opts,{preload:false}),cb), (node,cb)=>{ constb58Hash=cid.toBaseEncodedString() letsize=node.size if(Buffer.isBuffer(node)){ size=node.length } cb(null,{ path:opts.wrapWithDirectory ?file.path.substring(WRAPPER.length) :(file.path||b58Hash), hash:b58Hash, size }) } ],callback) 调用pull.map方法,把已经保存到本地的文件预加载到指定节点。map是一个 through 流,它会对每一个数组元素进行处理,这里处理函数为preloadFile。这个函数定义在同一个文件中,这会把已经保存的文件预加载到指定的节点,具体保存在哪些节点,可以参考《精通IPFS:系统启动之概览》篇中preload.addresses,也可以手动指定。调用pull.asyncMap方法,把已经保存到本地的文件长期保存在本地,确保不被垃圾回收。asyncMap方法是一个 through 流,这里处理函数为pinFile。pin 操作后面我们会详细分析,这里略过不提,读者可以自行阅读相关代码。—- 编译者/作者:星鉴网 玩币族申明:玩币族作为开放的资讯翻译/分享平台,所提供的所有资讯仅代表作者个人观点,与玩币族平台立场无关,且不构成任何投资理财建议。文章版权归原作者所有。 |
精通IPFS:IPFS 保存内容之上篇
2020-06-10 星鉴网 来源:区块链网络
LOADING...
相关阅读:
- 本周主要内容:比特币从$ 12,000美元下跌以及DeFi行业和Grayscale的新记录2020-08-03
- 比特币不会突破12,000美元-分析师现在在说什么2020-08-02
- 微文又中奖入围啦快来参与/炒股的一点点心得体会,你是不是也遇到过2020-08-02
- 导读—您将从这里看到哪些真正有意义和价值的内容?2020-08-02
- 福布斯公布六大重点内容,剑指IPFS!!2020-07-30