lotus-4 代码详解之miner调度及work处理

代码流程         1、./lotus wallet new              钱包类型(不同类型,签名算法不一样),不传参数默认secp256k1类型钱包                 bls:                 secp256k1:             客户端调用lotus/api/apistruct/struct.go  WalletNew函数 RPC调用->(服务端 ./lotus daemon ...) lotus/node/impl/full/wallet.go  WalletNew         2、./lotus send代码流程         3、miner接单流程(./lotus client deal [cid] t01000 0.000000005 100)             lotus分发:lotus/node/impl/client/client.go:ClientStartDeal->ProposeStorageDeal->                       err = c.statemachines.Send(deal.ProposalCid, storagemarket.ClientEventOpen) 订单文件目录  ~/.lotusstorage/fstmp*             miner接收:                         ->github.com/filecoin-project/go-statemachine/machine.go  Run reflect.ValueOf -->                         ->github.com/filecoin-project/go-statemachine/fsm/fsm.go handler reflect.ValueOf -->                         ->github.com/filecoin-project/go-fil-market/storagemarket/impl/providerstates/provider_states.go HandoffDeal(订单文件目录  ~/.lotusstorage/fstmp*) -->                         ->lotus/markets/storageadapter/provider.go   OnDealComplete -->                         ->lotus/storage/sectorblocks/blocks.go:AddPiece -->                         ->lotus/storage/sealing.go AllocatePiece() lotus/storage/sealing.go SealPiece() -->                         ->lotus/extern/storage-fms/sealing.go SealPiece() m.newSector() -->                         ->lotus/extern/sector-storage/manager.go AddPiece() -->                         ->lotus/extern/sector-storage/sched.go Schedule() 写入通道 ->runSched()->maybeSchedRequest()->assignWorker分配work处理 -->                         ->lotus/extern/sector-storage/localworker.go AddPiece()                             ->lotus/extern/sector-storage/ffiwrapper/sealer_cgo.go AddPiece()                                                             lotus/extern/storage-fms/sealing.go SealPiece -->                               ->lotus/extern/sector-storage/manager.goMiner.SealPiece(lotus/storage/sealing.go) -->                                 lotus/extern/storage-fms/states.go newSector() -->                                 ->github.com/filecoin-project/go-statemachine/group.go Send()->loadOrCreate() (创建状态机结构体StateMachine{... planner:  s.hnd.Plan,...})                                  ->github.com/filecoin-project/go-statemachine/machine.go run()                                   ->                     4、miner调度及work处理             1、miner调度器(schedule):miner初始化创建并启动(./lotus-storage-miner init ...)                  主要代码:lotus/extern/sector-storage/manager.go New()                 func New(...) ...                     {                        ...                         m := &Manager{                            scfg: cfg,                             ls:         ls,                             storage:    stor,                             localStore: lstor,                             remoteHnd:  &stores.FetchHandler{Local: lstor},                             index:      si,                             sched: newScheduler(cfg.SealProofType),                             Prover: prover,                         }                         go m.sched.runSched()//启动调度                         ...                     }             2、miner状态机(machine):miner收到存储订单,AddPiece创建状态机并启动(具体在github.com/filecoin-project/go-statemachine/group.go loadOrCreate()),                 状态机所有运行状态切换如下,可在lotus/extern/storage-fms/fsm.go plan()中找到,也可以直接参考map变量fsmPlanners                     /*                     *   Empty                     |   |                     |   v                     *<- Packing <- incoming                     |   |                     |   v                     *<- PreCommit1 <--> SealFailed                     |   |                 ^^^                     |   v                 |||                     *<- PreCommit2 -------/||                     |   |                  ||                     |   v          /-------/|                     *   PreCommitting <-----+---> PreCommitFailed                     |   |                   |     ^                     |   v                   |     |                     *<- WaitSeed -----------+-----/                     |   |||  ^              |                     |   |||  \--------*-----/                     |   |||           |                     |   vvv      v----+----> ComputeProofFailed                     *<- Committing    |                     |   |        ^--> CommitFailed                     |   v             ^                     *<- CommitWait ---/                     |   |                     |   v                     *<- Proving                     |                     v                     FailedUnrecoverable                     UndefinedSectorState <- ¯\_(ツ)_/¯                         |                     ^                         *---------------------/                 */             3、miner调度器与状态机关联                 miner状态机具体调用:                 1、github.com/filecoin-project/go-statemachine/machine.go run() fsm.planner(创建状态机的时候指定为Plan())                 2、lotus/extern/storage-fms/fsm.go Plan()->plan() 根据状态机状态调用对应的handle函数                     说明:plan()函数中根据状态获取map变量fsmPlanners中对应的函数planOne并执行,planOne根据event消息映射出对应的状态并设置下一状态,                         planOne主要代码如下:                             ...                             mut, next := t()                             if reflect.TypeOf(events[0].User) != reflect.TypeOf(mut) {                                continue                             }                             ...                             events[0].User.(mutator).apply(state)                             state.State = next                         plan主要代码如下:                             ...                             p := fsmPlanners[state.State]//当前状态                             if p == nil {                                return nil, xerrors.Errorf("planner for state %s not found", state.State)                             }                             if err := p(events, state); err != nil { //实际调用的是当前状态对应的planOne                                 return nil, xerrors.Errorf("running planner for state %s failed: %w", state.State, err)                             }                             switch state.State {                                ...//对应的handle                             }                             ...                 3、lotus/extern/storage-fms/states.go handle***       -> ctx.Send()(ctx.Send初始化在machine.go send) ->                      handlePreCommit1主要代码如下(举例说明):                         ...                         if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state                             switch err.(type) {                            case *ErrApi:                                 log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)                                 return nil                             case *ErrInvalidDeals:                                 return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid dealIDs in sector: %w", err)})                             case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector?                                 return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired dealIDs in sector: %w", err)})                             default:                                 return xerrors.Errorf("checkPieces sanity check error: %w", err)                             }                         }                         ...                             pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorNumber), ticketValue, sector.pieceInfos()) //调用的是manager.go对应的方法                         ...                         return ctx.Send(SectorPreCommit1{                            PreCommit1Out: pc1o,                             TicketValue:   ticketValue,                             TicketEpoch:   ticketEpoch,                         }) //往通道fsm.eventsIn写入消息                     其中ctx为调用handle时候指定,主要代码在github.com/filecoin-project/go-statemachine/machine.go run()                         for {                        select {                            case evt := <-fsm.eventsIn:                                 pendingEvents = append(pendingEvents, evt)                             case <-fsm.stageDone:                                 if len(pendingEvents) == 0 {                                    continue                                 }                             case <-fsm.closing:                                 return                             }                             ...                             ctx := Context{                                ctx: context.TODO(),                                 send: func(evt interface{}) error {                                    return fsm.send(Event{User: evt}) //往通道fsm.eventsIn写入消息(map变量fsmPlanners中的SectorStart、SectorPacked...)                                 },                             }                             go func() {                                if nextStep != nil {                                    res := reflect.ValueOf(nextStep).Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(ustate).Elem()})                                     if res[0].Interface() != nil {                                        log.Errorf("executing step: %+v", res[0].Interface().(error)) // TODO: propagate top level                                         return                                     }                                 }                                 atomic.StoreInt32(&fsm.busy, 0)                                 fsm.stageDone <- struct{}{}                             }()                             ...                         }                 miner调度器具体调用:                         4、lotus/extern/sector-storage/manager.go                     SealPreCommit1主要代码(举例说明):                         ...                         err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, m.schedFetch(sector, stores.FTUnsealed, true), func(ctx context.Context, w Worker) error {                            p, err := w.SealPreCommit1(ctx, sector, ticket, pieces)                             if err != nil {                                return err                             }                             out = p                             return nil                         })                         ...                 5、lotus/extern/sector-storage/sched.go                       1、Schedule()主要代码:                     {                        ...                         ret := make(chan workerResponse)                         select {                        case sh.schedule <- &workerRequest{                            sector:   sector,                             taskType: taskType,                             sel:      sel,                             prepare: prepare,                             work:    work,                             ret: ret,                             ctx: ctx,                         }:                         ...                     }                     2、runSched()代码:                     {                                                 go sh.runWorkerWatcher()                         for {                            select {                            case w := <-sh.newWorkers:                                 sh.schedNewWorker(w)                             case wid := <-sh.workerClosing:                                 sh.schedDropWorker(wid)                             case req := <-sh.schedule:                                 scheduled, err := sh.maybeSchedRequest(req)                                 if err != nil {                                    req.respond(err)                                     continue                                 }                                 if scheduled {                                    continue                                 }                                 heap.Push(sh.schedQueue, req)                             case wid := <-sh.workerFree:                                 sh.onWorkerFreed(wid)                             case <-sh.closing:                                 sh.schedClose()                                 return                             }                         }                     }                     3、分配work处理,assignWorker主要代码                         {                            ...                             go func() {                                err := req.prepare(req.ctx, w.w) //一般是拉取数据                                 sh.workersLk.Unlock()                                 ...                                 err = w.active.withResources(sh.spt, wid, w.info.Resources, needRes, &sh.workersLk, func() error {                                    w.preparing.free(w.info.Resources, needRes)                                     sh.workersLk.Unlock()                                     defer sh.workersLk.Lock() // we MUST return locked from this function                                     select {                                    case sh.workerFree <- wid:                                     case <-sh.closing:                                     }                                     err = req.work(req.ctx, w.w) //具体处理 p1 p2 c1 c2等                                     select {                                    case req.ret <- workerResponse{err: err}:                                         log.Warnf(" work Response ", err)                                     case <-req.ctx.Done():                                         log.Warnf("request got cancelled before we could respond")                                     case <-sh.closing:                                         log.Warnf("scheduler closed while sending response")                                     }                                     return nil                                 })                                 sh.workersLk.Unlock()                                 ...                             }()                             ...                         }                 6、lotus/extern/sector-storage/localworker.go                     SealPreCommit1主要代码(举例说明):                     {                        {                            // cleanup previous failed attempts if they exist                             if err := l.storage.Remove(ctx, sector, stores.FTSealed, true); err != nil {                                return nil, xerrors.Errorf("cleaning up sealed data: %w", err)                             }                             if err := l.storage.Remove(ctx, sector, stores.FTCache, true); err != nil {                                return nil, xerrors.Errorf("cleaning up cache data: %w", err)                             }                         }                         sb, err := l.sb()                         if err != nil {                            return nil, err                         }                         log.Warnf("LocalWorker SealPreCommit1")                         return sb.SealPreCommit1(ctx, sector, ticket, pieces)                     }                 7、lotus/extern/sector-storage/ffiwrapper/sealer_cgo.go                          SealPreCommit1 扇区正常状态(假设每个状态都正确执行)流程: 状态:UndefinedSectorState->Packing->PreCommit1 -> PreCommit2 ->PreCommitting->PreCommitWait->WaitSeed->Committing              ->CommitWait->FinalizeSector->...   |                                    |              |                                                    |                                   |   v                                    v              v                                                    v                                   v 对应事件:                            TTPreCommit1 TTPreCommit2                                          TTCommit1 TTCommit2?                   TTFinalize   |   v 分配work处理