代码流程 1、./lotus wallet new 钱包类型(不同类型,签名算法不一样),不传参数默认secp256k1类型钱包 bls: secp256k1: 客户端调用lotus/api/apistruct/struct.go WalletNew函数 RPC调用->(服务端 ./lotus daemon ...) lotus/node/impl/full/wallet.go WalletNew 2、./lotus send代码流程 3、miner接单流程(./lotus client deal [cid] t01000 0.000000005 100) lotus分发:lotus/node/impl/client/client.go:ClientStartDeal->ProposeStorageDeal-> err = c.statemachines.Send(deal.ProposalCid, storagemarket.ClientEventOpen) 订单文件目录 ~/.lotusstorage/fstmp* miner接收: ->github.com/filecoin-project/go-statemachine/machine.go Run reflect.ValueOf --> ->github.com/filecoin-project/go-statemachine/fsm/fsm.go handler reflect.ValueOf --> ->github.com/filecoin-project/go-fil-market/storagemarket/impl/providerstates/provider_states.go HandoffDeal(订单文件目录 ~/.lotusstorage/fstmp*) --> ->lotus/markets/storageadapter/provider.go OnDealComplete --> ->lotus/storage/sectorblocks/blocks.go:AddPiece --> ->lotus/storage/sealing.go AllocatePiece() lotus/storage/sealing.go SealPiece() --> ->lotus/extern/storage-fms/sealing.go SealPiece() m.newSector() --> ->lotus/extern/sector-storage/manager.go AddPiece() --> ->lotus/extern/sector-storage/sched.go Schedule() 写入通道 ->runSched()->maybeSchedRequest()->assignWorker分配work处理 --> ->lotus/extern/sector-storage/localworker.go AddPiece() ->lotus/extern/sector-storage/ffiwrapper/sealer_cgo.go AddPiece() lotus/extern/storage-fms/sealing.go SealPiece --> ->lotus/extern/sector-storage/manager.goMiner.SealPiece(lotus/storage/sealing.go) --> lotus/extern/storage-fms/states.go newSector() --> ->github.com/filecoin-project/go-statemachine/group.go Send()->loadOrCreate() (创建状态机结构体StateMachine{... planner: s.hnd.Plan,...}) ->github.com/filecoin-project/go-statemachine/machine.go run() -> 4、miner调度及work处理 1、miner调度器(schedule):miner初始化创建并启动(./lotus-storage-miner init ...) 主要代码:lotus/extern/sector-storage/manager.go New() func New(...) ... { ... m := &Manager{ scfg: cfg, ls: ls, storage: stor, localStore: lstor, remoteHnd: &stores.FetchHandler{Local: lstor}, index: si, sched: newScheduler(cfg.SealProofType), Prover: prover, } go m.sched.runSched()//启动调度 ... } 2、miner状态机(machine):miner收到存储订单,AddPiece创建状态机并启动(具体在github.com/filecoin-project/go-statemachine/group.go loadOrCreate()), 状态机所有运行状态切换如下,可在lotus/extern/storage-fms/fsm.go plan()中找到,也可以直接参考map变量fsmPlanners /* * Empty | | | v *<- Packing <- incoming | | | v *<- PreCommit1 <--> SealFailed | | ^^^ | v ||| *<- PreCommit2 -------/|| | | || | v /-------/| * PreCommitting <-----+---> PreCommitFailed | | | ^ | v | | *<- WaitSeed -----------+-----/ | ||| ^ | | ||| \--------*-----/ | ||| | | vvv v----+----> ComputeProofFailed *<- Committing | | | ^--> CommitFailed | v ^ *<- CommitWait ---/ | | | v *<- Proving | v FailedUnrecoverable UndefinedSectorState <- ¯\_(ツ)_/¯ | ^ *---------------------/ */ 3、miner调度器与状态机关联 miner状态机具体调用: 1、github.com/filecoin-project/go-statemachine/machine.go run() fsm.planner(创建状态机的时候指定为Plan()) 2、lotus/extern/storage-fms/fsm.go Plan()->plan() 根据状态机状态调用对应的handle函数 说明:plan()函数中根据状态获取map变量fsmPlanners中对应的函数planOne并执行,planOne根据event消息映射出对应的状态并设置下一状态, planOne主要代码如下: ... mut, next := t() if reflect.TypeOf(events[0].User) != reflect.TypeOf(mut) { continue } ... events[0].User.(mutator).apply(state) state.State = next plan主要代码如下: ... p := fsmPlanners[state.State]//当前状态 if p == nil { return nil, xerrors.Errorf("planner for state %s not found", state.State) } if err := p(events, state); err != nil { //实际调用的是当前状态对应的planOne return nil, xerrors.Errorf("running planner for state %s failed: %w", state.State, err) } switch state.State { ...//对应的handle } ... 3、lotus/extern/storage-fms/states.go handle*** -> ctx.Send()(ctx.Send初始化在machine.go send) -> handlePreCommit1主要代码如下(举例说明): ... if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state switch err.(type) { case *ErrApi: log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) return nil case *ErrInvalidDeals: return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid dealIDs in sector: %w", err)}) case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector? return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired dealIDs in sector: %w", err)}) default: return xerrors.Errorf("checkPieces sanity check error: %w", err) } } ... pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorNumber), ticketValue, sector.pieceInfos()) //调用的是manager.go对应的方法 ... return ctx.Send(SectorPreCommit1{ PreCommit1Out: pc1o, TicketValue: ticketValue, TicketEpoch: ticketEpoch, }) //往通道fsm.eventsIn写入消息 其中ctx为调用handle时候指定,主要代码在github.com/filecoin-project/go-statemachine/machine.go run() for { select { case evt := <-fsm.eventsIn: pendingEvents = append(pendingEvents, evt) case <-fsm.stageDone: if len(pendingEvents) == 0 { continue } case <-fsm.closing: return } ... ctx := Context{ ctx: context.TODO(), send: func(evt interface{}) error { return fsm.send(Event{User: evt}) //往通道fsm.eventsIn写入消息(map变量fsmPlanners中的SectorStart、SectorPacked...) }, } go func() { if nextStep != nil { res := reflect.ValueOf(nextStep).Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(ustate).Elem()}) if res[0].Interface() != nil { log.Errorf("executing step: %+v", res[0].Interface().(error)) // TODO: propagate top level return } } atomic.StoreInt32(&fsm.busy, 0) fsm.stageDone <- struct{}{} }() ... } miner调度器具体调用: 4、lotus/extern/sector-storage/manager.go SealPreCommit1主要代码(举例说明): ... err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, m.schedFetch(sector, stores.FTUnsealed, true), func(ctx context.Context, w Worker) error { p, err := w.SealPreCommit1(ctx, sector, ticket, pieces) if err != nil { return err } out = p return nil }) ... 5、lotus/extern/sector-storage/sched.go 1、Schedule()主要代码: { ... ret := make(chan workerResponse) select { case sh.schedule <- &workerRequest{ sector: sector, taskType: taskType, sel: sel, prepare: prepare, work: work, ret: ret, ctx: ctx, }: ... } 2、runSched()代码: { go sh.runWorkerWatcher() for { select { case w := <-sh.newWorkers: sh.schedNewWorker(w) case wid := <-sh.workerClosing: sh.schedDropWorker(wid) case req := <-sh.schedule: scheduled, err := sh.maybeSchedRequest(req) if err != nil { req.respond(err) continue } if scheduled { continue } heap.Push(sh.schedQueue, req) case wid := <-sh.workerFree: sh.onWorkerFreed(wid) case <-sh.closing: sh.schedClose() return } } } 3、分配work处理,assignWorker主要代码 { ... go func() { err := req.prepare(req.ctx, w.w) //一般是拉取数据 sh.workersLk.Unlock() ... err = w.active.withResources(sh.spt, wid, w.info.Resources, needRes, &sh.workersLk, func() error { w.preparing.free(w.info.Resources, needRes) sh.workersLk.Unlock() defer sh.workersLk.Lock() // we MUST return locked from this function select { case sh.workerFree <- wid: case <-sh.closing: } err = req.work(req.ctx, w.w) //具体处理 p1 p2 c1 c2等 select { case req.ret <- workerResponse{err: err}: log.Warnf(" work Response ", err) case <-req.ctx.Done(): log.Warnf("request got cancelled before we could respond") case <-sh.closing: log.Warnf("scheduler closed while sending response") } return nil }) sh.workersLk.Unlock() ... }() ... } 6、lotus/extern/sector-storage/localworker.go SealPreCommit1主要代码(举例说明): { { // cleanup previous failed attempts if they exist if err := l.storage.Remove(ctx, sector, stores.FTSealed, true); err != nil { return nil, xerrors.Errorf("cleaning up sealed data: %w", err) } if err := l.storage.Remove(ctx, sector, stores.FTCache, true); err != nil { return nil, xerrors.Errorf("cleaning up cache data: %w", err) } } sb, err := l.sb() if err != nil { return nil, err } log.Warnf("LocalWorker SealPreCommit1") return sb.SealPreCommit1(ctx, sector, ticket, pieces) } 7、lotus/extern/sector-storage/ffiwrapper/sealer_cgo.go SealPreCommit1 扇区正常状态(假设每个状态都正确执行)流程: 状态:UndefinedSectorState->Packing->PreCommit1 -> PreCommit2 ->PreCommitting->PreCommitWait->WaitSeed->Committing ->CommitWait->FinalizeSector->... | | | | | v v v v v 对应事件: TTPreCommit1 TTPreCommit2 TTCommit1 TTCommit2? TTFinalize | v 分配work处理