LOADING...
LOADING...
LOADING...
当前位置: 玩币族首页 > 行情分析 > 精通 Filecoin:Lotus真实数据处理之Provider处理存储

精通 Filecoin:Lotus真实数据处理之Provider处理存储

2020-08-07 乔疯 来源:区块链网络

接上篇,当 Client 接收到用户的存储交易,创建一个 /fil/storage/mk/1.0.1 协议的流,然后通过流发送存储交易。处理这个协议的正是 HandleDealStream 方法。这个方法直接调用自身的 receiveDeal 方法进行处理。receiveDeal 方法处理如下: 从流中读取存储提案 Proposal 对象。

proposal,?err?:=?s.ReadDealProposal()
这里的流对象是 dealStream 对象(storagemarket/network/deal_stream.go),这个对象对原始流对象进行了封装。获取 ipld node 对象。
proposalNd,?err?:=?cborutil.AsIpld(proposal.DealProposal)
生成矿工交易对象。
deal?:=?&storagemarket.MinerDeal{Client:s.RemotePeer(),Miner:p.net.ID(),ClientDealProposal:?*proposal.DealProposal,ProposalCid:proposalNd.Cid(),State:storagemarket.StorageDealUnknown,Ref:proposal.Piece,
}
调用 fsm 状态组的 Begin 的方法,生成一个状态机,并开始跟踪矿工交易对象。
err?=?p.deals.Begin(proposalNd.Cid(),?deal)
保存流对象到连接管理器中。
err?=?p.conns.AddStream(proposalNd.Cid(),?s)
发送事件到 fsm 状态组,从而开始对交易对象进行处理。
return?p.deals.Send(proposalNd.Cid(),?storagemarket.ProviderEventOpen)
当处理机收到 ProviderEventOpen 状态事件时,因为初始状态为默认值 0,即 StorageDealUnknown,事件处理器对象经过内部处理找到对应的目的状态为 StorageDealValidating,从而调用其处理函数 ValidateDealProposal 函数进行处理。

1、`ValidateDealProposal` 函数 这个函数用来验证交易提案对象。 调用 Lotus Provider 适配器对象的 GetChainHead 方法,获取区块链顶部 tipset key 和其高度。

tok,?height,?err?:=?environment.Node().GetChainHead(ctx.Context())

if?err?!=?nil?{return?ctx.Trigger(storagemarket.ProviderEventDealRejected,?xerrors.Errorf("node?error?getting?most?recent?state?id:?%w",?err)) }

验证客户发送的交易提案对象。如果验证不通过,则发送拒绝事件。
if?err?:=?providerutils.VerifyProposal(ctx.Context(),?deal.ClientDealProposal,?tok,?environment.Node().VerifySignature);?err?!=?nil?{return?ctx.Trigger(storagemarket.ProviderEventDealRejected,?xerrors.Errorf("verifying?StorageDealProposal:?%w",?err))
}
检查交易提案中指定的矿工地址是否正确。如果不正确,则发送拒绝事件。
proposal?:=?deal.Proposal

if?proposal.Provider?!=?environment.Address()?{return?ctx.Trigger(storagemarket.ProviderEventDealRejected,?xerrors.Errorf("incorrect?provider?for?deal")) }

检查交易指定的高度是否正确。如果不正确,则发送拒绝事件。
if?height?>?proposal.StartEpoch-environment.DealAcceptanceBuffer()?{return?ctx.Trigger(storagemarket.ProviderEventDealRejected,?xerrors.Errorf("deal?start?epoch?is?too?soon?or?deal?already?expired"))
}
检查费用是否OK,如果不OK,则发送拒绝事件。
minPrice?:=?big.Div(big.Mul(environment.Ask().Price,?abi.NewTokenAmount(int64(proposal.PieceSize))),?abi.NewTokenAmount(1<<30))
if?proposal.StoragePricePerEpoch.LessThan(minPrice)?{return?ctx.Trigger(storagemarket.ProviderEventDealRejected,xerrors.Errorf("storage?price?per?epoch?less?than?asking?price:?%s?<?%s",?proposal.StoragePricePerEpoch,?minPrice))
}
检查交易的大小是否匹配。如果不匹配,则发送拒绝事件。
if?proposal.PieceSize?<?environment.Ask().MinPieceSize?{return?ctx.Trigger(storagemarket.ProviderEventDealRejected,xerrors.Errorf("piece?size?less?than?minimum?required?size:?%d?<?%d",?proposal.PieceSize,?environment.Ask().MinPieceSize))
}

if?proposal.PieceSize?>?environment.Ask().MaxPieceSize?{return?ctx.Trigger(storagemarket.ProviderEventDealRejected,xerrors.Errorf("piece?size?more?than?maximum?allowed?size:?%d?>?%d",?proposal.PieceSize,?environment.Ask().MaxPieceSize)) }

获取客户的资金。
clientMarketBalance,?err?:=?environment.Node().GetBalance(ctx.Context(),?proposal.Client,?tok)
if?err?!=?nil?{return?ctx.Trigger(storagemarket.ProviderEventDealRejected,?xerrors.Errorf("node?error?getting?client?market?balance?failed:?%w",?err))
}
如果客户可用资金小于总的交易费用,则发送拒绝事件。
if?clientMarketBalance.Available.LessThan(proposal.TotalStorageFee())?{return?ctx.Trigger(storagemarket.ProviderEventDealRejected,?xerrors.New("clientMarketBalance.Available?too?small"))
}
如果交易是验证过的,则进行验证。fsm 上下文对象的 Trigger 方法,发送事件。
return?ctx.Trigger(storagemarket.ProviderEventDealDeciding)
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealUnknown 修改为 StorageDealAcceptWait,从而调用其处理函数 DecideOnProposal 确定是否接收交易。 2、`DecideOnProposal` 函数 这个函数用来决定接受或拒绝交易。 调用环境对象的 RunCustomDecisionLogic 方法,运行自定义逻辑来验证是不接收客户交易。
accept,?reason,?err?:=?environment.RunCustomDecisionLogic(ctx.Context(),?deal)

if?err?!=?nil?{return?ctx.Trigger(storagemarket.ProviderEventDealRejected,?xerrors.Errorf("custom?deal?decision?logic?failed:?%w",?err)) }

如果不接收,则发送拒绝事件。
if?!accept?{return?ctx.Trigger(storagemarket.ProviderEventDealRejected,?fmt.Errorf(reason))
}
调用环境对象的 SendSignedResponse 方法,发送签名的响应给客户端。
err?=?environment.SendSignedResponse(ctx.Context(),?&network.Response{State:storagemarket.StorageDealWaitingForData,Proposal:?deal.ProposalCid,
})

if?err?!=?nil?{return?ctx.Trigger(storagemarket.ProviderEventSendResponseFailed,?err) }

这个方法找到对应的流,然后对响应进行签名,生成签名的响应对象,最后通过流发送响应。断开与客户端的连接。
if?err?:=?environment.Disconnect(deal.ProposalCid);?err?!=?nil?{log.Warnf("closing?client?connection:?%+v",?err)
}
调用 fsm 上下文对象的 Trigger 方法,发送一个事件。
return?ctx.Trigger(storagemarket.ProviderEventDataRequested)
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealAcceptWait 修改为 StorageDealWaitingForData,因为没有指定的处理函数,从而不会调用函数进行处理,一直等待数据传输过程发送事件。

当数据开始传输时,数据传输组件发送 ProviderEventDataTransferInitiated 事件,经过事件处理器把状态从 StorageDealWaitingForData 修改为 StorageDealTransferring,因为没有指定的处理函数,从而不会调用函数进行处理,一直等待数据传输过程发送事件。

当数据传输完成时,数据传输组件发送 ProviderEventDataTransferCompleted 事件,经过事件处理器把状态从 StorageDealTransferring 修改为 StorageDealVerifyData,从而调用其处理函数 VerifyData 验证数据。 3、`VerifyData` 函数 这个函数验证接受到的数据与交易提案中的 pieceCID 相匹配。

VerifyData 函数流程如下: 调用环境对象的 GeneratePieceCommitmentToFile 方法,生成碎片的 CID 、碎片所在目录和元数据目录。

pieceCid,?piecePath,?metadataPath,?err?:=?environment.GeneratePieceCommitmentToFile(deal.Ref.Root,?shared.AllSelector())
GeneratePieceCommitmentToFile 方法内容如下: 如果矿工设置了 universalRetrievalEnabled 标志,则直接调用 GeneratePieceCommitmentWithMetadata 函数进行处理。
if?p.p.universalRetrievalEnabled?{return?providerutils.GeneratePieceCommitmentWithMetadata(p.p.fs,?p.p.pio.GeneratePieceCommitmentToFile,?p.p.proofType,?payloadCid,?selector)
}
universalRetrievalEnabled 标志如果为真,则存储矿工会跟踪碎片中的所有 CID,因此对于所有 CID 都可以被检索,而不仅是 Root CID。否则,调用 piece IO 对象的 GeneratePieceCommitmentToFile 方法进行处理。
pieceCid,?piecePath,?_,?err?:=?p.p.pio.GeneratePieceCommitmentToFile(p.p.proofType,?payloadCid,?selector)
payloadCid 表示根 Root CID。

piece IO 对象的 GeneratePieceCommitmentToFile 方法处理如下: 调用文件存储对象的 CreateTemp 方法,创建一个临时文件。

f,?err?:=?pio.store.CreateTemp()
生成一个清理函数。
cleanup?:=?func()?{f.Close()_?=?pio.store.Delete(f.Path())
}
从底层存储对象中获取指定 CID 的内容,然后写入指定文件。
err?=?pio.carIO.WriteCar(context.Background(),?pio.bs,?payloadCid,?selector,?f,?userOnNewCarBlocks...)
获取文件大小,即碎片大小。
pieceSize?:=?uint64(f.Size())
定位到文件开头位置。
_,?err?=?f.Seek(0,?io.SeekStart)
使用文件内容生成碎片 ID。
commitment,?paddedSize,?err?:=?GeneratePieceCommitment(rt,?f,?pieceSize)
关闭文件。
_?=?f.Close()
返回碎片 CID 和文件路径。
return?commitment,?f.Path(),?paddedSize,?nil
返回碎片 CID 和碎片路径。
return?pieceCid,?piecePath,?filestore.Path(""),?err
验证生成的碎片 CID 和矿工交易中交易提案的碎片 CID是否一致。如果不一致,则发送拒绝事件。
if?pieceCid?!=?deal.Proposal.PieceCID?{return?ctx.Trigger(storagemarket.ProviderEventDealRejected,?xerrors.Errorf("proposal?CommP?doesn't?match?calculated?CommP"))
}
3. 调用 fsm 上下文对象的 Trigger 方法,发送一个事件。
return?ctx.Trigger(storagemarket.ProviderEventVerifiedData,?piecePath,?metadataPath)

当状态机收到这个事件后,经过事件处理器把状态从?`StorageDealVerifyData`?修改为?`StorageDealEnsureProviderFunds`,从而调用其处理函数?`EnsureProviderFunds`?确定是否接收交易。同时,在调用处理函数之前,通过?`Action`?函数,修改矿工交易对象的?`PiecePath`?和?`MetadataPath`?两个属性。

4、`EnsureProviderFunds` 函数 这个函数用来确定矿工有足够的资金来处理当前交易。 获取 Lotus Provider 适配器。
node?:=?environment.Node()
获取区块链顶部 tipset 对应的 key 和高度。
tok,?_,?err?:=?node.GetChainHead(ctx.Context())

if?err?!=?nil?{return?ctx.Trigger(storagemarket.ProviderEventNodeErrored,?xerrors.Errorf("acquiring?chain?head:?%w",?err)) }

获取矿工的 worker 地址。
waddr,?err?:=?node.GetMinerWorkerAddress(ctx.Context(),?deal.Proposal.Provider,?tok)

if?err?!=?nil?{return?ctx.Trigger(storagemarket.ProviderEventNodeErrored,?xerrors.Errorf("looking?up?miner?worker:?%w",?err)) }

调用 Lotus Provider 适配器的 EnsureFunds 方法,确保矿工有足够的资金来处理当前交易。
mcid,?err?:=?node.EnsureFunds(ctx.Context(),?deal.Proposal.Provider,?waddr,?deal.Proposal.ProviderCollateral,?tok)

if?err?!=?nil?{return?ctx.Trigger(storagemarket.ProviderEventNodeErrored,?xerrors.Errorf("ensuring?funds:?%w",?err)) }

如果返回的 mcid 是空的,那么意味着已经实时确认,则调用 fsm 上下文对象的 Trigger 方法,发送一个事件。
if?mcid?==?cid.Undef?{return?ctx.Trigger(storagemarket.ProviderEventFunded)
}
否则,调用 fsm 上下文对象的 Trigger 方法,发送另一个事件。
return?ctx.Trigger(storagemarket.ProviderEventFundingInitiated,?mcid)
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealEnsureProviderFunds 修改为 StorageDealProviderFunding,从而调用其处理函数 WaitForFunding 等待产一步的消息上链。同时,在调用处理函数之前,通过 Action 函数,修改矿工交易对象的 PublishCid 属性。 5、`WaitForFunding` 函数 这个函数用来等待消息上链。消息上链之后,调用 fsm 上下文对象的 Trigger 方法,发送一个事件。

函数内容如下:

node?:=?environment.Node()

return?node.WaitForMessage(ctx.Context(),?*deal.AddFundsCid,?func(code?exitcode.ExitCode,?bytes?[]byte,?err?error)?error?{if?err?!=?nil?{return?ctx.Trigger(storagemarket.ProviderEventNodeErrored,?xerrors.Errorf("AddFunds?errored:?%w",?err))}if?code?!=?exitcode.Ok?{return?ctx.Trigger(storagemarket.ProviderEventNodeErrored,?xerrors.Errorf("AddFunds?exit?code:?%s",?code.String()))}return?ctx.Trigger(storagemarket.ProviderEventFunded) })

当状态机收到 ProviderEventFunded 这个事件后,经过事件处理器把状态从 StorageDealProviderFunding 修改为 StorageDealPublish,从而调用其处理函数 PublishDeal 把交易信息上链。同时,在调用处理函数之前,通过 Action 函数,修改矿工交易对象的 PublishCid 属性。 6、`PublishDeal` 函数 这个函数主要用来提交交易信息上链。 生成矿工交易对象。
smDeal?:=?storagemarket.MinerDeal{Client:deal.Client,ClientDealProposal:?deal.ClientDealProposal,ProposalCid:deal.ProposalCid,State:deal.State,Ref:deal.Ref,
}
调用 Lotus Provider 适配器对象的 PublishDeals 把交易信息上链。
mcid,?err?:=?environment.Node().PublishDeals(ctx.Context(),?smDeal)
if?err?!=?nil?{return?ctx.Trigger(storagemarket.ProviderEventNodeErrored,?xerrors.Errorf("publishing?deal:?%w",?err))
}
调用 fsm 上下文对象的 Trigger 方法,发送事件。
return?ctx.Trigger(storagemarket.ProviderEventDealPublishInitiated,?mcid)
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealPublish 修改为 StorageDealPublishing,从而调用其处理函数 WaitForPublish 等待交易信息上链。 7、`WaitForPublish` 函数 这个函数用来等待交易信息上链,然后给客户端发送响应,然后断开与客户端的连接。最后调用 fsm 上下文对象的 Trigger 方法,通过事件处理生成一个事件对象,然后发送事件对象到状态机。此处生成的事件对象名称为 ProviderEventDealPublished。

当状态机收到这个事件后,经过事件处理器把状态从 StorageDealPublishing 修改为 StorageDealStaged,从而调用其处理函数 HandoffDeal 开始扇区密封处理。同时,在调用处理函数之前,通过 Action 函数,修改矿工交易对象的 ConnectionClosed 和 DealID 属性。

return?environment.Node().WaitForMessage(ctx.Context(),?*deal.PublishCid,?func(code?exitcode.ExitCode,?retBytes?[]byte,?err?error)?error?{if?err?!=?nil?{return?ctx.Trigger(storagemarket.ProviderEventDealPublishError,?xerrors.Errorf("PublishStorageDeals?errored:?%w",?err))}if?code?!=?exitcode.Ok?{return?ctx.Trigger(storagemarket.ProviderEventDealPublishError,?xerrors.Errorf("PublishStorageDeals?exit?code:?%s",?code.String()))}var?retval?market.PublishStorageDealsReturnerr?=?retval.UnmarshalCBOR(bytes.NewReader(retBytes))if?err?!=?nil?{return?ctx.Trigger(storagemarket.ProviderEventDealPublishError,?xerrors.Errorf("PublishStorageDeals?error?unmarshalling?result:?%w",?err))}

return?ctx.Trigger(storagemarket.ProviderEventDealPublished,?retval.IDs[0]) })

8、`HandoffDeal` 函数 这个函数调用 miner 的 Provide 适配器的 使用碎片路径生成文件对象。
file,?err?:=?environment.FileStore().Open(deal.PiecePath)

if?err?!=?nil?{return?ctx.Trigger(storagemarket.ProviderEventFileStoreErrored,?xerrors.Errorf("reading?piece?at?path?%s:?%w",?deal.PiecePath,?err)) }

使用碎片文件流生成碎片流。
paddedReader,?paddedSize?:=?padreader.New(file,?uint64(file.Size()))
调用 Lotus Provider 适配器对象的 OnDealComplete 方法,通知交易已经完成,从而把碎片加入某个扇区中。
err?=?environment.Node().OnDealComplete(ctx.Context(),storagemarket.MinerDeal{Client:deal.Client,ClientDealProposal:?deal.ClientDealProposal,ProposalCid:deal.ProposalCid,State:deal.State,Ref:deal.Ref,DealID:deal.DealID,FastRetrieval:deal.FastRetrieval,PiecePath:filestore.Path(environment.FileStore().Filename(deal.PiecePath)),
},paddedSize,paddedReader,
)

if?err?!=?nil?{return?ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed,?err) }

调用 fsm 上下文对象的 Trigger 方法,发送事件。
return?ctx.Trigger(storagemarket.ProviderEventDealHandedOff)
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealStaged 修改为 StorageDealSealing,从而调用其处理函数 VerifyDealActivated 等待扇区密封结果。 9、`VerifyDealActivated` 函数 生成回调函数。
cb?:=?func(err?error)?{if?err?!=?nil?{_?=?ctx.Trigger(storagemarket.ProviderEventDealActivationFailed,?err)}?else?{_?=?ctx.Trigger(storagemarket.ProviderEventDealActivated)}
}
当 Lotus Provider 适配器对象检查到交易对象变化时会调用这个回调函数,从而发送相应的事件。

当状态机收到这个事件后,经过事件处理器把状态从 StorageDealSealing 修改为 StorageDealActive,从而调用其处理函数 RecordPieceInfo 记录相关信息。调用 Lotus Provider 适配器对象的 OnDealSectorCommitted 方法,等待扇区被提交。

err?:=?environment.Node().OnDealSectorCommitted(ctx.Context(),?deal.Proposal.Provider,?deal.DealID,?cb)

if?err?!=?nil?{return?ctx.Trigger(storagemarket.ProviderEventDealActivationFailed,?err) }

返回空。
return?nil
9、`RecordPieceInfo` 函数 这个函数主要记录相关信息。

最后调用 fsm 上下文对象的 Trigger 方法,通过事件处理生成一个事件对象,然后发送事件对象到状态机。此处生成的事件对象名称为 ProviderEventDealCompleted。

当状态机收到这个事件后,经过事件处理器把状态从 StorageDealActive 修改为 StorageDealCompleted,最终结束状态机处理。

这里会删除碎片的临时文件。

—-

编译者/作者:乔疯

玩币族申明:玩币族作为开放的资讯翻译/分享平台,所提供的所有资讯仅代表作者个人观点,与玩币族平台立场无关,且不构成任何投资理财建议。文章版权归原作者所有。

LOADING...
LOADING...