掘金 后端 ( ) • 2024-04-01 23:46

influxdb官方文档的启动命令除了常见的systemctl命令启动,还有其他的从配置文件启动:

influxd -config /etc/influxdb/influxdb.conf

结合influxdb的代码目录,influxd的启动入口应该在: influxdb/cmd/influxd/main.go 通过代码可以发现,还包含了如备份,恢复等操作的实现。

Run(args ...string) error {  
   name, args := cmd.ParseCommandName(args)  
  
   // Extract name from args.  
   switch name {  
   case "", "run":  
      cmd := run.NewCommand()  
  
      // Tell the server the build details.  
      cmd.Version = version  
      cmd.Commit = commit  
      cmd.Branch = branch  
	  // 这里启动
      if err := cmd.Run(args...); err != nil {  
         return fmt.Errorf("run: %s", err)  
      }
	  // 此处省略一部分代码
	  case "backup":  
		   name := backup.NewCommand()  
		   if err := name.Run(args...); err != nil {  
		      return fmt.Errorf("backup: %s", err)  
		   }  
		case "restore":  
		   name := restore.NewCommand()  
		   if err := name.Run(args...); err != nil {  
		      return fmt.Errorf("restore: %s", err)  
		   }  
		case "config":  
		   if err := run.NewPrintConfigCommand().Run(args...); err != nil {  
		      return fmt.Errorf("config: %s", err)  
		   }  
		case "version":  
		   if err := NewVersionCommand().Run(args...); err != nil {  
		      return fmt.Errorf("version: %s", err)  
		   }  
		case "help":  
		   if err := help.NewCommand().Run(args...); err != nil {  
		      return fmt.Errorf("help: %s", err)  
		   }  
		default:  
		   return fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'influxd help' for usage`+"\n\n", name)  
		}
}

点进去

func (cmd *Command) Run(args ...string) error {
	// 前面一大堆都在解析各种配置
	s, err := NewServer(config, buildInfo)  
	if err != nil {  
	   return fmt.Errorf("create server: %s", err)  
	}  
	s.Logger = cmd.Logger  
	s.CPUProfile = options.CPUProfile  
	s.MemProfile = options.MemProfile  
	// 这里实际进入了启动流程
	if err := s.Open(); err != nil {  
	   return fmt.Errorf("open server: %s", err)  
	}
}

点进去server对象的Open函数,这里一些核心的部件在这里启动,其中最主要的是TSDB。之后下面会分组件依次介绍。

func (s *Server) Open() error {  
   // 一些服务的特质,下面的代码就进入核心组件的启动流程了。
  
   // Open TSDB store.  加载series 和 shards文件,同时加载database,meansurment,rp等信息。不过主要还是加载shards和series。
   if err := s.TSDBStore.Open(); err != nil {  
      return fmt.Errorf("open tsdb store: %s", err)  
   }  
   // 打开订阅者服务
   // Open the subscriber service  
   if err := s.Subscriber.Open(); err != nil {  
      return fmt.Errorf("open subscriber: %s", err)  
   }  
   // 开启数据写入服务
   // Open the points writer service  
   if err := s.PointsWriter.Open(); err != nil {  
      return fmt.Errorf("open points writer: %s", err)  
   }  
  
   s.PointsWriter.AddWriteSubscriber(s.Subscriber.Points())  
   // 开启服务
   for _, service := range s.Services {  
      if err := service.Open(); err != nil {  
         return fmt.Errorf("open service: %s", err)  
      }  
   }  
   // 是否开启influxdb的使用上报服务,类似于上报一些数据以便于influxdb分析,持续改进
   // Start the reporting service, if not disabled.  
   if !s.reportingDisabled {  
      go s.startServerReporting()  
   }  
  
   return nil  
}

TSDB 组件启动

TSDB 涉及到的一些关键组件:留个坑,可以先看这篇文章: https://lrita.github.io/2017/06/12/influxdb-tsdb/

TSDB实例化是在上文提到过的NewServer函数中:

s.TSDBStore = tsdb.NewStore(c.Data.Dir)

从这行代码可以看到,初始化一个tsdbstore实例需要的必要参数是指向influxdb存储目录data目录的路径。

func NewStore(path string) *Store {  
   logger := zap.NewNop()  
   return &Store{  
      databases:           make(map[string]*databaseState),  
      path:                path,  
      sfiles:              make(map[string]*SeriesFile),  
      indexes:             make(map[string]interface{}),  
      pendingShardDeletes: make(map[uint64]struct{}),  
      epochs:              make(map[uint64]*epochTracker),  
      EngineOptions:       NewEngineOptions(),  
      Logger:              logger,  
      baseLogger:          logger,  
   }  
}

由于本篇文章只是梳理influxdb启动流程,关于Tsdb的内容后面我看明白了在写,言下之意现在我也没看明白。 Store对象有一个Open函数,Tsdb的初始化入口就在这里: 源码位置: tsdb/store.go:202

func (s *Store) Open() error {  
   s.mu.Lock()  
   defer s.mu.Unlock()  
  
   if s.opened {  
	  // 防止重复初始化
      // Already open  
      return nil  
   }  
  
   s.closing = make(chan struct{})  
   s.shards = map[uint64]*Shard{}  
  
   s.Logger.Info("Using data dir", zap.String("path", s.Path()))  
  
   // Create directory.  
   // 如果不存在data目录,则创建一个
   if err := os.MkdirAll(s.path, 0777); err != nil {  
      return err  
   }  

   if err := s.loadShards(); err != nil {  
      return err  
   }  
  
   s.opened = true  
  
   if !s.EngineOptions.MonitorDisabled {  
      s.wg.Add(1)  
      go func() {  
         s.wg.Done()  
         s.monitorShards()  
      }()  
   }  
  
   return nil  
}

阅读这段代码我们发现,比较重要的是loadShards这个函数的调用。这个函数的内容比较长,主要展示比较核心的逻辑代码。

func (s *Store) loadShards() error {  
   // res holds the result from opening each shard in a goroutine  
   type res struct {  
      s   *Shard  
      err error  
   }  
   // ... 此处省略一些配置项
   resC := make(chan *res)  
   var n int  
	
   // Determine how many shards we need to open by checking the store path.  
   dbDirs, err := ioutil.ReadDir(s.path)  
   // dbDirs 指的是 data目录下一共有多少个db的目录
   if err != nil {  
      return err  
   }  
   // 依次处理每个DB的内容
   for _, db := range dbDirs { 
      dbPath := filepath.Join(s.path, db.Name())  
      if !db.IsDir() {  
         log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))  
         continue  
      }  
  
      if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) {  
         log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter"))  
         continue  
      }  
	  
      // 加载这个DB的 _series 文件
      sfile, err := s.openSeriesFile(db.Name())  
      if err != nil {  
         return err  
      }  
  
      // 创建内存索引
      idx, err := s.createIndexIfNotExists(db.Name())  
      if err != nil {  
         return err  
      }  
  
      // Load each retention policy within the database directory.  
      // 加载这个db下的数据保存策略。
      rpDirs, err := ioutil.ReadDir(dbPath)  
      if err != nil {  
         return err  
      }  
	  // 遍历处理所有的数据保留策略
      for _, rp := range rpDirs {  
	     // 加载每个rp处理策略的文件
         rpPath := filepath.Join(s.path, db.Name(), rp.Name())  
         if !rp.IsDir() {  
            log.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory"))  
            continue  
         }  
  
         // The .series directory is not a retention policy. 
         // 如果是rp是series目录 
         if rp.Name() == SeriesFileDirectory {  
            continue  
         }  
		  
         if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(db.Name(), rp.Name()) {  
            log.Info("Skipping retention policy dir", logger.RetentionPolicy(rp.Name()), zap.String("reason", "failed retention policy filter"))  
            continue  
         } 
          
		 // 加载某个rp下的所有shard
         shardDirs, err := ioutil.ReadDir(rpPath)  
         if err != nil {  
            return err  
         }  
		 //  遍历所有的 shard
         for _, sh := range shardDirs {  
            // Series file should not be in a retention policy but skip just in case.  
            // 为什么shard下面也会有时间序列文件
            if sh.Name() == SeriesFileDirectory {  
               log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name())))  
               continue  
            }  
			// n 是为了记录一共有多少shard被处理过,为后续的处理作准备
            n++  
            go func(db, rp, sh string) {  
               t.Take()  
               defer t.Release()  
			   
               start := time.Now()  
               path := filepath.Join(s.path, db, rp, sh)  
               walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh)  
			   // 将shard的id转换为base64 Id
               // Shard file names are numeric shardIDs  
               shardID, err := strconv.ParseUint(sh, 10, 64)  
               if err != nil {  
                  log.Info("invalid shard ID found at path", zap.String("path", path))  
                  resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)}  
                  return  
               }  
  
               if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) {  
                  log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID))  
                  resC <- &res{}  
                  return  
               }  
  
               // Copy options and assign shared index.  
               opt := s.EngineOptions  
               opt.InmemIndex = idx  
  
               // Provide an implementation of the ShardIDSets  
               opt.SeriesIDSets = shardSet{store: s, db: db}  
  
               // Existing shards should continue to use inmem index.  
               if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) {  
                  opt.IndexVersion = InmemIndexName  
               }  
  
               // 实例化一个shard对象
               shard := NewShard(shardID, path, walPath, sfile, opt)  
  
               // Disable compactions, writes and queries until all shards are loaded  
               shard.EnableOnOpen = false  
               shard.CompactionDisabled = s.EngineOptions.CompactionDisabled  
               shard.WithLogger(s.baseLogger)  
			   // 打开这个切片
               err = shard.Open()  
               if err != nil {  
                  log.Info("Failed to open shard", logger.Shard(shardID), zap.Error(err))  
                  resC <- &res{err: fmt.Errorf("Failed to open shard: %d: %s", shardID, err)}  
                  return  
               }  
			   // 实例化 res对象
               resC <- &res{s: shard}  
               log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start)))  
            }(db.Name(), rp.Name(), sh.Name())  
         }  
      }  
   }  
  
   // Gather results of opening shards concurrently, keeping track of how  
   // many databases we are managing.   
   for i := 0; i < n; i++ {  
	  // 按顺序取出来上面准备好的 res对象
      res := <-resC  
      if res.s == nil || res.err != nil {  
         continue  
      }  
      // 建立shard映射,key为shardId,value 为shard对象
      s.shards[res.s.id] = res.s  
      s.epochs[res.s.id] = newEpochTracker()  
      //如果对应的DB没有在databases缓存中,则生成对应的db对象并缓存
      if _, ok := s.databases[res.s.database]; !ok {  
         s.databases[res.s.database] = new(databaseState)  
      }  
      s.databases[res.s.database].addIndexType(res.s.IndexType())  
   }  
   close(resC)  
  
   // Check if any databases are running multiple index types.  
   // 检查数据库是否存在多个不同的索引类型
   for db, state := range s.databases {  
      if state.hasMultipleIndexTypes() {  
         var fields []zapcore.Field  
         for idx, cnt := range state.indexTypes {  
            fields = append(fields, zap.Int(fmt.Sprintf("%s_count", idx), cnt))  
         }  
         s.Logger.Warn("Mixed shard index types", append(fields, logger.Database(db))...)  
      }  
   }  
  
   // Enable all shards  
   // 开启所有的shard
   for _, sh := range s.shards {  
      sh.SetEnabled(true)  
      if isIdle, _ := sh.IsIdle(); isIdle {  
         if err := sh.Free(); err != nil {  
            return err  
         }  
      }  
   }  
  
   return nil  
}

以上就是loadShard的逻辑,通过前面的代码,我们好像可以总结出来一些规律,influxdb中的初始化总是伴随着Open类型的函数的调用,同样的,我们在loadShards中也发现shard对象有Open的操作。

 err = shard.Open()  

点进去:

func (s *Shard) Open() error {  
   if err := func() error {  
      s.mu.Lock()  
      defer s.mu.Unlock()  
  
      // Return if the shard is already open  
      if s._engine != nil {  
         return nil  
      }  
  
      seriesIDSet := NewSeriesIDSet()  
  
      // Initialize underlying index.  
      ipath := filepath.Join(s.path, "index")  
	  // 创建一个index对象
      idx, err := NewIndex(s.id, s.database, ipath, seriesIDSet, s.sfile, s.options)  
      if err != nil {  
         return err  
      }  
  
      idx.WithLogger(s.baseLogger)  
  
      // Open index.  
      // 初始化该index对象
      if err := idx.Open(); err != nil {  
         return err  
      }  
      s.index = idx  
  
      // Initialize underlying engine.  
      // 初始化引擎
      e, err := NewEngine(s.id, idx, s.path, s.walPath, s.sfile, s.options)  
      if err != nil {  
         return err  
      }  
  
      // Set log output on the engine.  
      e.WithLogger(s.baseLogger)  
  
      // Disable compactions while loading the index  
      e.SetEnabled(false)  
  
      // Open engine.  
	  // 打开引擎
      if err := e.Open(); err != nil {  
         return err  
      }  
  
      // Load metadata index for the inmem index only.  
      if err := e.LoadMetadataIndex(s.id, s.index); err != nil {  
         return err  
      }  
      s._engine = e  
  
      return nil  
   }(); err != nil {  
      s.close()  
      return NewShardError(s.id, err)  
   }  
  
   if s.EnableOnOpen {  
      // enable writes, queries and compactions  
      s.SetEnabled(true)  
   }  
  
   return nil  
}

在shard的Open逻辑中,主要做了两件事情,一件是创建索引:

idx, err := NewIndex(s.id, s.database, ipath, seriesIDSet, s.sfile, s.options)  

shard 内存索引初始化

func NewIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) (Index, error) {  
   format := options.IndexVersion  
  
   // Use default format unless existing directory exists.  
   _, err := os.Stat(path)  
   if os.IsNotExist(err) {  
      // nop, use default  
   } else if err != nil {  
      return nil, err  
   } else if err == nil {  
      format = TSI1IndexName  
   }  
  
   // Lookup index by format.  
   fn := newIndexFuncs[format]  
   if fn == nil {  
      return nil, fmt.Errorf("invalid index format: %q", format)  
   }  
   return fn(id, database, path, seriesIDSet, sfile, options), nil  
}

可以看到这里最核心的逻辑在 fn函数的执行上,具体使用那种索引规则取决于用户的配置, 默认情况下使用的是inmem的实现。位置: github.com/influxdata/influxdb/tsdb/index/inmem.init

func init() { 
   tsdb.RegisterIndex(IndexName, func(id uint64, database, path string, seriesIDSet *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Index {  
      return NewShardIndex(id, seriesIDSet, opt)  
   })  
}

// 这里生成了一个ShardIndex索引对象
func NewShardIndex(id uint64, seriesIDSet *tsdb.SeriesIDSet, opt tsdb.EngineOptions) tsdb.Index {  
   return &ShardIndex{  
      Index:        opt.InmemIndex.(*Index),  
      id:           id,  
      seriesIDSet:  seriesIDSet,  
      measurements: make(map[string]int),  
      opt:          opt,  
   }  
}


// 这个对象维护了一个索引对想
type ShardIndex struct {  
   id uint64 // shard id  
  
   *Index // Shared reference to global database-wide index.  
   // seriesIDSet 存储的是所有与该shard有关系的seriesId
   seriesIDSet *tsdb.SeriesIDSet
   // 每张表下series的计数
   measurements map[string]int
   // 一个配置对象
   opt tsdb.EngineOptions  
}


// 在inmem的实现中是一个open函数的空实现
func (i *Index) Open() (err error) { return nil }

介绍完Shard对象的初始化流程,我们来看下shard下engine的启动流程。

// Initialize underlying engine.  
e, err := NewEngine(s.id, idx, s.path, s.walPath, s.sfile, s.options)  
if err != nil {  
   return err  
}  
  
// Set log output on the engine.  
e.WithLogger(s.baseLogger)  
  
// Disable compactions while loading the index  
e.SetEnabled(false)  
  
// Open engine.  
if err := e.Open(); err != nil {  
   return err  
}


//默认情况下,NewEngine使用的是tsm1的实现
func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Engine {  
   var wal *WAL  
   if opt.WALEnabled {  
      wal = NewWAL(walPath)  
      wal.syncDelay = time.Duration(opt.Config.WALFsyncDelay)  
   }  
  
   fs := NewFileStore(path)  
   fs.openLimiter = opt.OpenLimiter  
   if opt.FileStoreObserver != nil {  
      fs.WithObserver(opt.FileStoreObserver)  
   }  
   fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed  
  
   cache := NewCache(uint64(opt.Config.CacheMaxMemorySize))  
  
   c := NewCompactor()  
   c.Dir = path  
   c.FileStore = fs  
   c.RateLimit = opt.CompactionThroughputLimiter  
  
   var planner CompactionPlanner = NewDefaultPlanner(fs, time.Duration(opt.Config.CompactFullWriteColdDuration))  
   if opt.CompactionPlannerCreator != nil {  
      planner = opt.CompactionPlannerCreator(opt.Config).(CompactionPlanner)  
      planner.SetFileStore(fs)  
   }  
  
   logger := zap.NewNop()  
   stats := &EngineStatistics{}  
   e := &Engine{  
      id:           id,  
      path:         path,  
      index:        idx,  
      sfile:        sfile,  
      logger:       logger,  
      traceLogger:  logger,  
      traceLogging: opt.Config.TraceLoggingEnabled,  
  
      WAL:   wal,  
      Cache: cache,  
  
      FileStore:      fs,  
      Compactor:      c,  
      CompactionPlan: planner,  
  
      CacheFlushMemorySizeThreshold: uint64(opt.Config.CacheSnapshotMemorySize),  
      CacheFlushWriteColdDuration:   time.Duration(opt.Config.CacheSnapshotWriteColdDuration),  
      enableCompactionsOnOpen:       true,  
      WALEnabled:                    opt.WALEnabled,  
      formatFileName:                DefaultFormatFileName,  
      stats:                         stats,  
      compactionLimiter:             opt.CompactionLimiter,  
      scheduler:                     newScheduler(stats, opt.CompactionLimiter.Capacity()),  
      seriesIDSets:                  opt.SeriesIDSets,  
   }  
  
   // Feature flag to enable per-series type checking, by default this is off and  
   // e.seriesTypeMap will be nil.   if os.Getenv("INFLUXDB_SERIES_TYPE_CHECK_ENABLED") != "" {  
      e.seriesTypeMap = radix.New()  
   }  
  
   if e.traceLogging {  
      fs.enableTraceLogging(true)  
      if e.WALEnabled {  
         e.WAL.enableTraceLogging(true)  
      }  
   }  
  
   return e  
}

这里生成了一个Engine对象,这个对象带有一个Open方法。

func (e *Engine) Open() error {  
. 
   //这里的path是某个shard的具体路径,如果存在,就创建
   if err := os.MkdirAll(e.path, 0777); err != nil {  
      return err  
   }  
  
   if err := e.cleanup(); err != nil {  
      return err  
   }  
   // 这里读到的是这个shard下有的field字段信息,key是对应的measurement命,value是下面的字段
   fields, err := tsdb.NewMeasurementFieldSet(filepath.Join(e.path, "fields.idx"))  
   if err != nil {  
      e.logger.Warn(fmt.Sprintf("error opening fields.idx: %v.  Rebuilding.", err))  
   }  
  
   e.mu.Lock()  
   e.fieldset = fields  
   e.mu.Unlock()  
  
   e.index.SetFieldSet(fields)  
  
   if e.WALEnabled {  
      if err := e.WAL.Open(); err != nil {  
         return err  
      }  
   }  
  
   if err := e.FileStore.Open(); err != nil {  
      return err  
   }  
  
   if e.WALEnabled {  
      if err := e.reloadCache(); err != nil {  
         return err  
      }  
   }  
  
   e.Compactor.Open()  
  
   if e.enableCompactionsOnOpen {  
      e.SetCompactionsEnabled(true)  
   }  
  
   return nil  
}

到此engine对象也初始化完成了,整个tsdb的初始化基本完成,基本上tsdb主要操作的是influxdb目录下的data目录,也就是实际存储数据的目录,依次初始化每个DB下每个RP下的每个Shard。