LOADING...
LOADING...
LOADING...
当前位置: 玩币族首页 > 区块链资产 > 精通IPFS:IPFS 保存内容之上篇

精通IPFS:IPFS 保存内容之上篇

2020-06-10 星鉴网 来源:区块链网络

经过前面的分析,我们已经明白了 IPFS 启动过程,从今天起,我会分析一些常见的命令或动作,希望大家喜欢。

在开始真正分析这些命令/动作之前,先要对 pull-stream 类库进行简单介绍,如果不熟悉这个类库,接下来就没办法进行。

pull-stream 是一个新型的流库,数据被从源中拉取到目的中,它有两种基本类型的流:Source 源和 Sink 接收器。除此之外,有两种复合类型的流:Through 通道流(比如转换)和 Duplex 双向流。

source 流,这类流返回一个匿名函数,这个匿名函数被称为 read 函数,它被后续的 sink 流函数或 through 流函数调用,从而读取 source 流中的内容。

sink 流,这类流最终都返回内部 drain.js 中的 sink 函数。这类流主要是读取数据,并且对每一个读取到的数据进行处理,如果流已经结束,则调用用户指定结束函数进行处理。

through 流,这类流的函数会返回嵌套的匿名函数,第一层函数接收一个 source 流的 read 函数或其他 through 函数返回的第一层函数为参数,第二层函数接收最终 sink 提供的写函数或其他 through 返回的第二层函数,第二层函数内部调用 read 函数,从而直接或间接从 source 中取得数据,获取数据后直接或间接调用 sink 函数,从而把数据写入到目的地址。

在 pull-streams 中,数据在流动之前,必须有一个完整的管道,这意味着一个源、零个或多个通道、一个接收器。但是仍然可以创建一个部分化的管道,这非常有用。也就是说可以创建一个完整的管道,比如pull(source, sink) => undefined,也可以部分化的管道,比如pull(through, sink) => sink,或者pull(through1, through2) => through,我们在下面会大量遇到这种部分化的管道。

今天,我们看下第一个最常用的add命令/动作,我们使用 IPFS 就是为了把文件保存到 IPFS,自然少不了保存操作,add命令就是干这个的,闲话少数,我们来看一段代码。

const{createNode}=require('ipfs')

constnode=createNode({ libp2p:{ config:{ dht:{ enabled:true } } } })

node.on('ready',async()=>{

constcontent=`我爱黑萤`;

constfilesAdded=awaitnode.add({ content:Buffer.from(content) },{ chunkerOptions:{ maxChunkSize:1000, avgChunkSize:1000 } })

console.log('Addedfile:',filesAdded[0].path,filesAdded[0].hash) })

这次我们没有完全使用默认配置,开启了 DHT,看过我文章的读者都知道 DHT 是什么东东,这里不详细解释。在程序中,通过调用 IPFS 节点的add方法来上传内容,内容可以是文件,也可以是直接的内容,两者有稍微的区别,在讲到相关代码时,我们指出这种区别的,这里我们为了简单直接上传内容为例来说明。

add方法位于core/components/files-regular/add.js文件中,在 《精通IPFS:系统启动之概览》 那篇文章中,我们说过,系统会把core/components/files-regular目录下的所有文件扩展到 IPFS 对象上面,这其中自然包括这里的add.js文件。下面,我们直接看这个函数的执行流程。

这个函数返回了一个内部定义的函数,在这个内部定义的函数中对参数做了一些处理,然后就调用内部的add函数,后者才是主体,它的逻辑如下:

首先,检查选项对象是否为函数,如果是,则重新生成相关的变量。

if(typeofoptions==='function'){ callback=options options={} }

定义检测内容的工具函数来检测我们要上传的内容。

constisBufferOrStream=obj=>Buffer.isBuffer(obj)||isStream.readable(obj)||isSource(obj) constisContentObject=obj=>{ if(typeofobj!=='object')returnfalse if(obj.content)returnisBufferOrStream(obj.content) returnBoolean(obj.path)&&typeofobj.path==='string' }

constisInput=obj=>isBufferOrStream(obj)||isContentObject(obj) constok=isInput(data)||(Array.isArray(data)&&data.every(isInput))

if(!ok){ returncallback(newError('invalidinput:expectedbuffer,readablestream,pullstream,objectorarrayofobjects')) }

接下来,执行 pull-stream 类库提供的pull函数。我们来看pull函数的主要内容。它的第一个参数是pull.values函数执行的结果,这个values函数就是一个 source 流,它返回一个称为read的函数来读取我们提供的数据。这个read函数从数组中读取当前索引位置的值,以此值为参数,调用它之后的 through 函数第二层函数内部定义的回调函数或最终的 sink 函数内部定义的回调函数。如果数组已经读取完成,则直接以 true 为参数进行调用。

第二个参数是 IPFS 对象的addPullStream方法,这个方法也是在启动时候使用同样的方法扩展到 IPFS 对象,它的主体是当前目录的add-pull-stream.js文件中的函数。接下来,我们会详细看这个函数,现在我们只需要知道这个函数返回了一个部分化的管道。

第三个参数是pull-sort中定义的函数,这是一个依赖于pull-stream的库,根据一定规则来排序,这个函数我们不用管。

最后一个参数是pull.collect函数执行的结果,这个collect函数就是一个 sink 流。它把最终的结果放入一个数组中,然后调用回调函数。我们在前面代码中看到的filesAdded之所以是一个数组就是拜这个函数所赐。

上面逻辑的代码如下:

pull( pull.values([data]), self.addPullStream(options), sort((a,b)=>{ if(a.path<b.path)return1 if(a.path>b.path)return-1 return0 }), pull.collect(callback) )

在上面的代码中,我们把要保存的内容构成一个数组,具体原因下面解释。

现在,我们来看addPullStream方法,这个方法是保存内容的主体,add方法是只开胃小菜。addPullStream方法执行逻辑如下:

调用parseChunkerString函数,处理内容分块相关的选项。这个函数位于相同目录下的utils.js文件中,它检查用户指定的分块算法。如果用户没有指定,则使用固定分块算法,大小为系统默认的 262144;如果指定了大小,则使用固定分块算法,但大小为用户指定大小;如果指定为rabin类分割法,即变长分割法,则调用内部函数来生成对应的分割选项。上面逻辑代码如下:

parseChunkerString=(chunker)=>{ if(!chunker){ return{ chunker:'fixed' } }elseif(chunker.startsWith('size-')){ constsizeStr=chunker.split('-')[1] constsize=parseInt(sizeStr) if(isNaN(size)){ thrownewError('Chunkerparametersizemustbeaninteger') } return{ chunker:'fixed', chunkerOptions:{ maxChunkSize:size } } }elseif(chunker.startsWith('rabin')){ return{ chunker:'rabin', chunkerOptions:parseRabinString(chunker) } }else{ thrownewError(Unrecognizedchunkeroption:${chunker}) } }

注意:我们也可以通过重写这个函数来增加自己的分割算法。合并整理选项变量。

constopts=Object.assign({},{ shardSplitThreshold:self._options.EXPERIMENTAL.sharding ?1000 :Infinity },options,chunkerOptions)

设置默认的 CID 版本号。如果指定了 Hash 算法,但是 CID 版本又不是 1,则强制设为 1。CID 是分布式系统的自描述内容寻址标识符,目前有两个版本 0 和 1,版本 0 是一个向后兼容的版本,只支持 sha256 哈希算法,并且不能指定。

if(opts.hashAlg&&opts.cidVersion!==1){ opts.cidVersion=1 }

设置进度处理函数,默认空实现。

constprog=opts.progress||noop constprogress=(bytes)=>{ total+=bytes prog(total) }

opts.progress=progress

用pull函数返回一个部分化的 pull-stream 流。这个部分化的 pull-stream 流是处理文件/内容保存的关键,我们仔细研究下。首先调用pull.map方法对保存的内容进行处理。pull.map方法是 pull-stream 流中的一个 source 流,它对数组中的每个元素使用指定的处理函数进行处理。这就是我们在add函数中把需要保存的内容转化为数组的原因。在这里,对每个数组元素进行处理的函数是normalizeContent。这个函数定义在同一个文件中,它首先检查保存的内容是否为数组,如果不是则转化为数组;然后,对数组中的每一个元素进行处理,具体如下:如果保存的内容是 Buffer 对象,则把要保存的内容转化为路径为空字符串,内容为 pull-stream 流的对象。

if(Buffer.isBuffer(data)){ data={path:'',content:pull.values([data])} }

如果保存的内容是一个 Node.js 可读流,比如文件,则把要保存的转化为路径为空字符串,内容使用 stream-to-pull-stream 类的source方法库把 Node.js 可读流转化为 pull-stream 的 source 流对象。

if(isStream.readable(data)){ data={path:'',content:toPull.source(data)} }

如果保存的内容是 pull-stream 的 source 流,则把要保存的内容转化为路径为空字符串,内容不变的对象。

if(isSource(data)){ data={path:'',content:data} }

如果要保存的内容是一个对象,并且content属性存在,且不是函数,则进行如下处理:

if(data&&data.content&&typeofdata.content!=='function'){ if(Buffer.isBuffer(data.content)){ data.content=pull.values([data.content]) }

if(isStream.readable(data.content)){ data.content=toPull.source(data.content) } }

如果指定的是路径,则进行下面的处理。

if(opts.wrapWithDirectory&&!data.path){ thrownewError('Mustprovideapathwhenwrappingwithadirectory') }

if(opts.wrapWithDirectory){ data.path=WRAPPER+data.path }

返回最终生成的要保存的内容。调用pull.flatten()方法,把前上步生成的数组进行扁平化处理。flatten方法是一个 through 流,主要是把多个流或数组流转换为一个流,比如把多个数组转换成一个数组,比如:

[ [1,2,3], [4,5,6], [7,8,9] ]

这样的数组使用这个方法处理后,最终会变成下面的数组

[1,2,3,4,5,6,7,8,9]

调用importer函数来保存内容。这个函数定义在ipfs-unixfs-importer类库中,这个类库是 IPFS 用于处理文件的布局和分块机制的 JavaScript 实现,具体如何保存内容,如何分块我们将在下篇文章中进行详细分析。调用pull.asyncMap方法,对已经保存的文件/内容进行预处理,生成用户看到的内容。当程序执行到这里时,我们要保存的文件或内容已经保存在本地 IPFS 仓库,已经可以用使用cat、get、ls等命令来 API 来查看我们保存的内容或文件了。asyncMap方法是一个 through 流,类似于map流,但是有更好的性能。它会对每一个数组元素进行处理,这里处理函数为prepareFile。

这个函数定义在同一个文件中,它的处理具体如下:

使用已经生成文件的multihash内容生成 CID 对象。

letcid=newCID(file.multihash)

CID 构造方法会检查传入的参数,如果是 CID 对象,则直接从对象中取出版本号、编码方式、多哈希等属性;如果是字符串,则又分为是否被 multibase 编码过,如果是则需要先解码,然后再分离出各种属性,如果没有经过 multibase 编码,那么肯定是 base58 字符串,则设置版本为0,编码方式为dag-pb,再从 base58 串中获取多哈希值;如果是缓冲对象,则取得第一个字节,并按十六进制转化成整数,如果第一个字节是 0或1,则生成各自属性,否则为多哈希,则设置版本为0,编码方式为dag-pb。如果用户指定 CID 版本为 1,则生成 CID 对象到版本1.

if(opts.cidVersion===1){ cid=cid.toV1() }

接下来,调用waterfall方法,顺序处理它指定的函数。第一个函数,检查配置选项是否指定了onlyHash,即不实际地上传文件到IFS网络,仅仅计算一下这个文件的 HASH,那么直接调用第二个函数,否则,调用 IPFS 对象的object.get方法来获取指定文件在仓库中保存的节点信息。这个方法我们后面会专门讲解,这里略去不讲。第二个函数,生成最终返回给用户的对象,这个对象包括了:path、size、hash 等。

上面代码如下,比较简单,可自己阅读。

waterfall([ (cb)=>opts.onlyHash ?cb(null,file) :self.object.get(file.multihash,Object.assign({},opts,{preload:false}),cb), (node,cb)=>{ constb58Hash=cid.toBaseEncodedString()

letsize=node.size

if(Buffer.isBuffer(node)){ size=node.length }

cb(null,{ path:opts.wrapWithDirectory ?file.path.substring(WRAPPER.length) :(file.path||b58Hash), hash:b58Hash, size }) }

],callback)

调用pull.map方法,把已经保存到本地的文件预加载到指定节点。map是一个 through 流,它会对每一个数组元素进行处理,这里处理函数为preloadFile。这个函数定义在同一个文件中,这会把已经保存的文件预加载到指定的节点,具体保存在哪些节点,可以参考《精通IPFS:系统启动之概览》篇中preload.addresses,也可以手动指定。调用pull.asyncMap方法,把已经保存到本地的文件长期保存在本地,确保不被垃圾回收。asyncMap方法是一个 through 流,这里处理函数为pinFile。pin 操作后面我们会详细分析,这里略过不提,读者可以自行阅读相关代码。

—-

编译者/作者:星鉴网

玩币族申明:玩币族作为开放的资讯翻译/分享平台,所提供的所有资讯仅代表作者个人观点,与玩币族平台立场无关,且不构成任何投资理财建议。文章版权归原作者所有。

LOADING...
LOADING...