influxdb官方文档的启动命令除了常见的systemctl命令启动,还有其他的从配置文件启动:
influxd -config /etc/influxdb/influxdb.conf
结合influxdb的代码目录,influxd的启动入口应该在: influxdb/cmd/influxd/main.go
通过代码可以发现,还包含了如备份,恢复等操作的实现。
Run(args ...string) error {
name, args := cmd.ParseCommandName(args)
// Extract name from args.
switch name {
case "", "run":
cmd := run.NewCommand()
// Tell the server the build details.
cmd.Version = version
cmd.Commit = commit
cmd.Branch = branch
// 这里启动
if err := cmd.Run(args...); err != nil {
return fmt.Errorf("run: %s", err)
}
// 此处省略一部分代码
case "backup":
name := backup.NewCommand()
if err := name.Run(args...); err != nil {
return fmt.Errorf("backup: %s", err)
}
case "restore":
name := restore.NewCommand()
if err := name.Run(args...); err != nil {
return fmt.Errorf("restore: %s", err)
}
case "config":
if err := run.NewPrintConfigCommand().Run(args...); err != nil {
return fmt.Errorf("config: %s", err)
}
case "version":
if err := NewVersionCommand().Run(args...); err != nil {
return fmt.Errorf("version: %s", err)
}
case "help":
if err := help.NewCommand().Run(args...); err != nil {
return fmt.Errorf("help: %s", err)
}
default:
return fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'influxd help' for usage`+"\n\n", name)
}
}
点进去
func (cmd *Command) Run(args ...string) error {
// 前面一大堆都在解析各种配置
s, err := NewServer(config, buildInfo)
if err != nil {
return fmt.Errorf("create server: %s", err)
}
s.Logger = cmd.Logger
s.CPUProfile = options.CPUProfile
s.MemProfile = options.MemProfile
// 这里实际进入了启动流程
if err := s.Open(); err != nil {
return fmt.Errorf("open server: %s", err)
}
}
点进去server对象的Open函数,这里一些核心的部件在这里启动,其中最主要的是TSDB。之后下面会分组件依次介绍。
func (s *Server) Open() error {
// 一些服务的特质,下面的代码就进入核心组件的启动流程了。
// Open TSDB store. 加载series 和 shards文件,同时加载database,meansurment,rp等信息。不过主要还是加载shards和series。
if err := s.TSDBStore.Open(); err != nil {
return fmt.Errorf("open tsdb store: %s", err)
}
// 打开订阅者服务
// Open the subscriber service
if err := s.Subscriber.Open(); err != nil {
return fmt.Errorf("open subscriber: %s", err)
}
// 开启数据写入服务
// Open the points writer service
if err := s.PointsWriter.Open(); err != nil {
return fmt.Errorf("open points writer: %s", err)
}
s.PointsWriter.AddWriteSubscriber(s.Subscriber.Points())
// 开启服务
for _, service := range s.Services {
if err := service.Open(); err != nil {
return fmt.Errorf("open service: %s", err)
}
}
// 是否开启influxdb的使用上报服务,类似于上报一些数据以便于influxdb分析,持续改进
// Start the reporting service, if not disabled.
if !s.reportingDisabled {
go s.startServerReporting()
}
return nil
}
TSDB 组件启动
TSDB 涉及到的一些关键组件:留个坑,可以先看这篇文章: https://lrita.github.io/2017/06/12/influxdb-tsdb/
TSDB实例化是在上文提到过的NewServer函数中:
s.TSDBStore = tsdb.NewStore(c.Data.Dir)
从这行代码可以看到,初始化一个tsdbstore实例需要的必要参数是指向influxdb存储目录data目录的路径。
func NewStore(path string) *Store {
logger := zap.NewNop()
return &Store{
databases: make(map[string]*databaseState),
path: path,
sfiles: make(map[string]*SeriesFile),
indexes: make(map[string]interface{}),
pendingShardDeletes: make(map[uint64]struct{}),
epochs: make(map[uint64]*epochTracker),
EngineOptions: NewEngineOptions(),
Logger: logger,
baseLogger: logger,
}
}
由于本篇文章只是梳理influxdb启动流程,关于Tsdb的内容后面我看明白了在写,言下之意现在我也没看明白。 Store对象有一个Open函数,Tsdb的初始化入口就在这里: 源码位置: tsdb/store.go:202
func (s *Store) Open() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.opened {
// 防止重复初始化
// Already open
return nil
}
s.closing = make(chan struct{})
s.shards = map[uint64]*Shard{}
s.Logger.Info("Using data dir", zap.String("path", s.Path()))
// Create directory.
// 如果不存在data目录,则创建一个
if err := os.MkdirAll(s.path, 0777); err != nil {
return err
}
if err := s.loadShards(); err != nil {
return err
}
s.opened = true
if !s.EngineOptions.MonitorDisabled {
s.wg.Add(1)
go func() {
s.wg.Done()
s.monitorShards()
}()
}
return nil
}
阅读这段代码我们发现,比较重要的是loadShards这个函数的调用。这个函数的内容比较长,主要展示比较核心的逻辑代码。
func (s *Store) loadShards() error {
// res holds the result from opening each shard in a goroutine
type res struct {
s *Shard
err error
}
// ... 此处省略一些配置项
resC := make(chan *res)
var n int
// Determine how many shards we need to open by checking the store path.
dbDirs, err := ioutil.ReadDir(s.path)
// dbDirs 指的是 data目录下一共有多少个db的目录
if err != nil {
return err
}
// 依次处理每个DB的内容
for _, db := range dbDirs {
dbPath := filepath.Join(s.path, db.Name())
if !db.IsDir() {
log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
continue
}
if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) {
log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter"))
continue
}
// 加载这个DB的 _series 文件
sfile, err := s.openSeriesFile(db.Name())
if err != nil {
return err
}
// 创建内存索引
idx, err := s.createIndexIfNotExists(db.Name())
if err != nil {
return err
}
// Load each retention policy within the database directory.
// 加载这个db下的数据保存策略。
rpDirs, err := ioutil.ReadDir(dbPath)
if err != nil {
return err
}
// 遍历处理所有的数据保留策略
for _, rp := range rpDirs {
// 加载每个rp处理策略的文件
rpPath := filepath.Join(s.path, db.Name(), rp.Name())
if !rp.IsDir() {
log.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory"))
continue
}
// The .series directory is not a retention policy.
// 如果是rp是series目录
if rp.Name() == SeriesFileDirectory {
continue
}
if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(db.Name(), rp.Name()) {
log.Info("Skipping retention policy dir", logger.RetentionPolicy(rp.Name()), zap.String("reason", "failed retention policy filter"))
continue
}
// 加载某个rp下的所有shard
shardDirs, err := ioutil.ReadDir(rpPath)
if err != nil {
return err
}
// 遍历所有的 shard
for _, sh := range shardDirs {
// Series file should not be in a retention policy but skip just in case.
// 为什么shard下面也会有时间序列文件
if sh.Name() == SeriesFileDirectory {
log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name())))
continue
}
// n 是为了记录一共有多少shard被处理过,为后续的处理作准备
n++
go func(db, rp, sh string) {
t.Take()
defer t.Release()
start := time.Now()
path := filepath.Join(s.path, db, rp, sh)
walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh)
// 将shard的id转换为base64 Id
// Shard file names are numeric shardIDs
shardID, err := strconv.ParseUint(sh, 10, 64)
if err != nil {
log.Info("invalid shard ID found at path", zap.String("path", path))
resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)}
return
}
if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) {
log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID))
resC <- &res{}
return
}
// Copy options and assign shared index.
opt := s.EngineOptions
opt.InmemIndex = idx
// Provide an implementation of the ShardIDSets
opt.SeriesIDSets = shardSet{store: s, db: db}
// Existing shards should continue to use inmem index.
if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) {
opt.IndexVersion = InmemIndexName
}
// 实例化一个shard对象
shard := NewShard(shardID, path, walPath, sfile, opt)
// Disable compactions, writes and queries until all shards are loaded
shard.EnableOnOpen = false
shard.CompactionDisabled = s.EngineOptions.CompactionDisabled
shard.WithLogger(s.baseLogger)
// 打开这个切片
err = shard.Open()
if err != nil {
log.Info("Failed to open shard", logger.Shard(shardID), zap.Error(err))
resC <- &res{err: fmt.Errorf("Failed to open shard: %d: %s", shardID, err)}
return
}
// 实例化 res对象
resC <- &res{s: shard}
log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start)))
}(db.Name(), rp.Name(), sh.Name())
}
}
}
// Gather results of opening shards concurrently, keeping track of how
// many databases we are managing.
for i := 0; i < n; i++ {
// 按顺序取出来上面准备好的 res对象
res := <-resC
if res.s == nil || res.err != nil {
continue
}
// 建立shard映射,key为shardId,value 为shard对象
s.shards[res.s.id] = res.s
s.epochs[res.s.id] = newEpochTracker()
//如果对应的DB没有在databases缓存中,则生成对应的db对象并缓存
if _, ok := s.databases[res.s.database]; !ok {
s.databases[res.s.database] = new(databaseState)
}
s.databases[res.s.database].addIndexType(res.s.IndexType())
}
close(resC)
// Check if any databases are running multiple index types.
// 检查数据库是否存在多个不同的索引类型
for db, state := range s.databases {
if state.hasMultipleIndexTypes() {
var fields []zapcore.Field
for idx, cnt := range state.indexTypes {
fields = append(fields, zap.Int(fmt.Sprintf("%s_count", idx), cnt))
}
s.Logger.Warn("Mixed shard index types", append(fields, logger.Database(db))...)
}
}
// Enable all shards
// 开启所有的shard
for _, sh := range s.shards {
sh.SetEnabled(true)
if isIdle, _ := sh.IsIdle(); isIdle {
if err := sh.Free(); err != nil {
return err
}
}
}
return nil
}
以上就是loadShard的逻辑,通过前面的代码,我们好像可以总结出来一些规律,influxdb中的初始化总是伴随着Open类型的函数的调用,同样的,我们在loadShards
中也发现shard对象有Open的操作。
err = shard.Open()
点进去:
func (s *Shard) Open() error {
if err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
// Return if the shard is already open
if s._engine != nil {
return nil
}
seriesIDSet := NewSeriesIDSet()
// Initialize underlying index.
ipath := filepath.Join(s.path, "index")
// 创建一个index对象
idx, err := NewIndex(s.id, s.database, ipath, seriesIDSet, s.sfile, s.options)
if err != nil {
return err
}
idx.WithLogger(s.baseLogger)
// Open index.
// 初始化该index对象
if err := idx.Open(); err != nil {
return err
}
s.index = idx
// Initialize underlying engine.
// 初始化引擎
e, err := NewEngine(s.id, idx, s.path, s.walPath, s.sfile, s.options)
if err != nil {
return err
}
// Set log output on the engine.
e.WithLogger(s.baseLogger)
// Disable compactions while loading the index
e.SetEnabled(false)
// Open engine.
// 打开引擎
if err := e.Open(); err != nil {
return err
}
// Load metadata index for the inmem index only.
if err := e.LoadMetadataIndex(s.id, s.index); err != nil {
return err
}
s._engine = e
return nil
}(); err != nil {
s.close()
return NewShardError(s.id, err)
}
if s.EnableOnOpen {
// enable writes, queries and compactions
s.SetEnabled(true)
}
return nil
}
在shard的Open逻辑中,主要做了两件事情,一件是创建索引:
idx, err := NewIndex(s.id, s.database, ipath, seriesIDSet, s.sfile, s.options)
shard 内存索引初始化
func NewIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) (Index, error) {
format := options.IndexVersion
// Use default format unless existing directory exists.
_, err := os.Stat(path)
if os.IsNotExist(err) {
// nop, use default
} else if err != nil {
return nil, err
} else if err == nil {
format = TSI1IndexName
}
// Lookup index by format.
fn := newIndexFuncs[format]
if fn == nil {
return nil, fmt.Errorf("invalid index format: %q", format)
}
return fn(id, database, path, seriesIDSet, sfile, options), nil
}
可以看到这里最核心的逻辑在 fn函数的执行上,具体使用那种索引规则取决于用户的配置, 默认情况下使用的是inmem的实现
。位置: github.com/influxdata/influxdb/tsdb/index/inmem.init
func init() {
tsdb.RegisterIndex(IndexName, func(id uint64, database, path string, seriesIDSet *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Index {
return NewShardIndex(id, seriesIDSet, opt)
})
}
// 这里生成了一个ShardIndex索引对象
func NewShardIndex(id uint64, seriesIDSet *tsdb.SeriesIDSet, opt tsdb.EngineOptions) tsdb.Index {
return &ShardIndex{
Index: opt.InmemIndex.(*Index),
id: id,
seriesIDSet: seriesIDSet,
measurements: make(map[string]int),
opt: opt,
}
}
// 这个对象维护了一个索引对想
type ShardIndex struct {
id uint64 // shard id
*Index // Shared reference to global database-wide index.
// seriesIDSet 存储的是所有与该shard有关系的seriesId
seriesIDSet *tsdb.SeriesIDSet
// 每张表下series的计数
measurements map[string]int
// 一个配置对象
opt tsdb.EngineOptions
}
// 在inmem的实现中是一个open函数的空实现
func (i *Index) Open() (err error) { return nil }
介绍完Shard对象的初始化流程,我们来看下shard下engine的启动流程。
// Initialize underlying engine.
e, err := NewEngine(s.id, idx, s.path, s.walPath, s.sfile, s.options)
if err != nil {
return err
}
// Set log output on the engine.
e.WithLogger(s.baseLogger)
// Disable compactions while loading the index
e.SetEnabled(false)
// Open engine.
if err := e.Open(); err != nil {
return err
}
//默认情况下,NewEngine使用的是tsm1的实现
func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Engine {
var wal *WAL
if opt.WALEnabled {
wal = NewWAL(walPath)
wal.syncDelay = time.Duration(opt.Config.WALFsyncDelay)
}
fs := NewFileStore(path)
fs.openLimiter = opt.OpenLimiter
if opt.FileStoreObserver != nil {
fs.WithObserver(opt.FileStoreObserver)
}
fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize))
c := NewCompactor()
c.Dir = path
c.FileStore = fs
c.RateLimit = opt.CompactionThroughputLimiter
var planner CompactionPlanner = NewDefaultPlanner(fs, time.Duration(opt.Config.CompactFullWriteColdDuration))
if opt.CompactionPlannerCreator != nil {
planner = opt.CompactionPlannerCreator(opt.Config).(CompactionPlanner)
planner.SetFileStore(fs)
}
logger := zap.NewNop()
stats := &EngineStatistics{}
e := &Engine{
id: id,
path: path,
index: idx,
sfile: sfile,
logger: logger,
traceLogger: logger,
traceLogging: opt.Config.TraceLoggingEnabled,
WAL: wal,
Cache: cache,
FileStore: fs,
Compactor: c,
CompactionPlan: planner,
CacheFlushMemorySizeThreshold: uint64(opt.Config.CacheSnapshotMemorySize),
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
enableCompactionsOnOpen: true,
WALEnabled: opt.WALEnabled,
formatFileName: DefaultFormatFileName,
stats: stats,
compactionLimiter: opt.CompactionLimiter,
scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()),
seriesIDSets: opt.SeriesIDSets,
}
// Feature flag to enable per-series type checking, by default this is off and
// e.seriesTypeMap will be nil. if os.Getenv("INFLUXDB_SERIES_TYPE_CHECK_ENABLED") != "" {
e.seriesTypeMap = radix.New()
}
if e.traceLogging {
fs.enableTraceLogging(true)
if e.WALEnabled {
e.WAL.enableTraceLogging(true)
}
}
return e
}
这里生成了一个Engine对象,这个对象带有一个Open方法。
func (e *Engine) Open() error {
.
//这里的path是某个shard的具体路径,如果存在,就创建
if err := os.MkdirAll(e.path, 0777); err != nil {
return err
}
if err := e.cleanup(); err != nil {
return err
}
// 这里读到的是这个shard下有的field字段信息,key是对应的measurement命,value是下面的字段
fields, err := tsdb.NewMeasurementFieldSet(filepath.Join(e.path, "fields.idx"))
if err != nil {
e.logger.Warn(fmt.Sprintf("error opening fields.idx: %v. Rebuilding.", err))
}
e.mu.Lock()
e.fieldset = fields
e.mu.Unlock()
e.index.SetFieldSet(fields)
if e.WALEnabled {
if err := e.WAL.Open(); err != nil {
return err
}
}
if err := e.FileStore.Open(); err != nil {
return err
}
if e.WALEnabled {
if err := e.reloadCache(); err != nil {
return err
}
}
e.Compactor.Open()
if e.enableCompactionsOnOpen {
e.SetCompactionsEnabled(true)
}
return nil
}
到此engine对象也初始化完成了,整个tsdb的初始化基本完成,基本上tsdb主要操作的是influxdb目录下的data目录,也就是实际存储数据的目录,依次初始化每个DB下每个RP下的每个Shard。