LOADING...
LOADING...
LOADING...
当前位置: 玩币族首页 > 区块链资讯 > 精通 Filecoin:Lotus真实数据处理之Provider初始化

精通 Filecoin:Lotus真实数据处理之Provider初始化

2020-08-03 乔疯 来源:区块链网络

因为 StorageProvider 对象被存储矿工 API 对象所依赖,所以在启动存储矿工的过程中,DI 容器会调用 StorageProvider 函数(node/modules/storageminer.go)来创建它。StorageProvider 函数流程如下: 调用 NewFromLibp2pHost 函数,生成 StorageMarketNetwork 对象。

net?:=?smnet.NewFromLibp2pHost(h)
调用 NewLocalFileStore 函数,生成 FileStore 存储对象。
store,?err?:=?piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))
NewLocalFileStore 函数(go-fil-markets 类库 filestore/filestore.go)流程如下:
base?:=?filepath.Clean(string(basedirectory))
info,?err?:=?os.Stat(string(base))

if?!info.IsDir()?{return?nil,?fmt.Errorf("%s?is?not?a?directory",?base) }

return?&fileStore{string(base)},?nil

NewLocalFileStore 函数使用的路径为仓库目录。即碎片的临时目录就是仓库目录。调用 CustomDealDecisionLogic 函数,返回一个函数对象。在函数对象中调用我们提供的回调函数,进行自定义交易逻辑判断。
opt?:=?storageimpl.CustomDealDecisionLogic(func(ctx?context.Context,?deal?storagemarket.MinerDeal)?(bool,?string,?error)?{

})

生成并返回 StorageProvider 对象。
p,?err?:=?storageimpl.NewProvider(net,?namespace.Wrap(ds,?datastore.NewKey("/deals/provider")),?ibs,?store,?pieceStore,?dataTransfer,?spn,?address.Address(minerAddress),?ffiConfig.SealProofType,?storedAsk,?opt)

return?p,?nil

NewProvider 函数处理如下: 生成 PieceIOWithStore 对象。
carIO?:=?cario.NewCarIO()
pio?:=?pieceio.NewPieceIOWithStore(carIO,?fs,?bs)
生成 Provider 对象。
h?:=?&Provider{net:net,proofType:rt,spn:spn,fs:fs,pio:pio,pieceStore:pieceStore,conns:connmanager.NewConnManager(),storedAsk:storedAsk,actor:minerAddress,dataTransfer:dataTransfer,dealAcceptanceBuffer:?DefaultDealAcceptanceBuffer,pubSub:pubsub.New(providerDispatcher),
}
生成 fsm 状态组对象。
deals,?err?:=?NewProviderStateMachine(ds,&providerDealEnvironment{h},h.dispatch,
)

h.deals?=?deals

fsm 状态组对象使用的配置参数如下:
return?fsm.New(ds,?fsm.Parameters{Environment:env,StateType:storagemarket.MinerDeal{},StateKeyField:"State",Events:providerstates.ProviderEvents,StateEntryFuncs:?providerstates.ProviderStateEntryFuncs,FinalityStates:providerstates.ProviderFinalityStates,Notifier:notifier,
})
环境对象为 providerDealEnvironment。状态对象为 MinerDeal。状态字段为 State。事件集合为 ProviderEvents,参考 storagemarket/impl/providerstates/provider_fsm.go 文件。状态处理函数集合 为 ProviderStateEntryFuncs,状态机的状态处理器根据对应的状态获取到指定的函数进行处理。终止状态集合为 ProviderFinalityStates。通知对象为 Provider 对象的 dispatch 方法。 使用配置选项,配置 Provider 对象。
h.Configure(options...)
设置数据传输监听对象。
dataTransfer.SubscribeToEvents(dtutils.ProviderDataTransferSubscriber(deals))
当开始数据传输、传输结束、传输错误时会发送 ProviderEventDataTransferInitiated、ProviderEventDataTransferCompleted、ProviderEventDataTransferFailed 等事件到 fsm 状态组。返回 Provider 对象。

在存储矿工启动过程自动调用 HandleDeals 函数(node/modules/storageminer.go)。在这个函数中,调用 StorageProvider 对象的 Start 方法,从而启动这个对象。

Start 方法执行过程如下: 调用 StorageMarketNetwork 网络对象的 SetDelegate 设置代理/委托为自身。

err?:=?p.net.SetDelegate(p)
网络对象的实现为 libp2pStorageMarketNetwork 结构体(storagemarket/network/libp2p_impl.go)。它的 SetDelegate 方法内容如下:
impl.receiver?=?r
impl.host.SetStreamHandler(storagemarket.DealProtocolID,?impl.handleNewDealStream)
impl.host.SetStreamHandler(storagemarket.AskProtocolID,?impl.handleNewAskStream)
return?nil
上面分别设置网络对象的 handleNewDealStream 方法处理 DealProtocolID 协议,表示存储;handleNewAskStream 方法 处理 AskProtocolID 协议,表示 ask。

handleNewDealStream 方法内容如下:

//?客户端?peer?id
remotePID?:=?s.Conn().RemotePeer()

buffered?:=?bufio.NewReaderSize(s,?16)

//?对流进行包装 ds?:=?&dealStream{remotePID,?impl.host,?s,?buffered}

//?调用?StorageProvider?对象的?HandleDealStream?方法,处理客户端存储请求 impl.receiver.HandleDealStream(ds)

在协程中调用 StorageProvider 对象的 restartDeals 方法,重新进行交易处理。restartDeals 方法流程如下: 从 fsm 状态组对象中获取所有的交易对象。
var?deals?[]storagemarket.MinerDeal
err?:=?c.deals.List(&deals)
遍历所有的交易对象,进行下面的处理: 如果当前交易对象已经终止,则进行下一个处理。如果当前交易对象的连接已经关闭,则进行下一个处理。发送初始交易事件给 fsm 状态组。
err?=?c.deals.Send(deal.ProposalCid,?storagemarket.ProviderEventRestart)
交易提案的 Cid 表示了状态机的名称/编号。 返回空值。

—-

编译者/作者:乔疯

玩币族申明:玩币族作为开放的资讯翻译/分享平台,所提供的所有资讯仅代表作者个人观点,与玩币族平台立场无关,且不构成任何投资理财建议。文章版权归原作者所有。

LOADING...
LOADING...