掘金 后端 ( ) • 2024-04-02 00:06

meta 指的是influxdb的源信息,在磁盘上的位置是meta目录下的meta.db, meta.db 存储了一些数据库,RP, Shard, 连续查询等信息。 关于meta部分的代码在 influxdb/services/meta/config.go 这个目录下。 我们把目光聚集在data.go这个文件中。

第一部分 Meta 的数据结构

首先点进去映入眼帘的就是一个名叫Data的结构体定义。

// Data表示所有元数据的最上层集合(来对地方了)
// Data represents the top level collection of all metadata.
type Data struct {  
   Term      uint64 // associated raft term  
   Index     uint64 // associated raft index  
   ClusterID uint64  
   Databases []DatabaseInfo  
   Users     []UserInfo  
  
   // adminUserExists provides a constant time mechanism for determining   
   // if there is at least one admin user.  
   adminUserExists bool  
  
   MaxShardGroupID uint64  
   MaxShardID      uint64  
}  

这里的话,Term和Index这两个属性在开源版是没有用到的,主要用在集群版里面,通过注释也可以看出来一些端倪,集群版分布式一致性算法采用的是raft算法。

既然注释中说Data是元数据的最上层的概念,那说明Databases,Users 位于Data的下层。而MaxShardGroupID,MaxShardID 应该是为了便于查询或者计算而定义的。我们先从DatabaseInfo 来看。

type DatabaseInfo struct {  
   Name                   string  
   DefaultRetentionPolicy string  
   RetentionPolicies      []RetentionPolicyInfo  
   ContinuousQueries      []ContinuousQueryInfo  
}

DatabaseInfo 存的是database的详情信息,包含Name,默认的RP,该数据库下保留的RP列表, 以及连续查询策略。其他没有了。

我们继续往下看,看看RetentionPolicyInfo 是不是还有什么。

// 数据保留策略代表一个 retention policy 的元数据
// RetentionPolicyInfo represents metadata about a retention policy.
type RetentionPolicyInfo struct {  
   Name               string   //RP的名次
   ReplicaN           int  // 副本数
   Duration           time.Duration  //时间范围
   ShardGroupDuration time.Duration  // shardGroup实际范围
   ShardGroups        []ShardGroupInfo  //对应的ShardGroup
   Subscriptions      []SubscriptionInfo  
}

RetentionPolicyInfo 下并不会直接关联Shard,而是会关联到一个ShardGroup的概念上去。ShardGroup是shard的逻辑分组,逻辑分组的意思表示,shardGroup和shard不一样,并不会实际存储在磁盘中,shardGroup和rp策略是强相关,大家可以理解为一个ShardGroup存储的是某个特定时间范围内的数据。

我们继续往下看ShardGroups

// ShardGroupInfo represents metadata about a shard group. The DeletedAt field is important// because it makes it clear that a ShardGroup has been marked as deleted, and allow the system  
// to be sure that a ShardGroup is not simply missing. If the DeletedAt is set, the system can// safely delete any associated shards.  
type ShardGroupInfo struct {  
   ID          uint64  
   StartTime   time.Time  
   EndTime     time.Time  
   DeletedAt   time.Time  
   Shards      []ShardInfo  
   TruncatedAt time.Time  
}

DeletedAt 字段标记了一个shardGroup的删除时间,如果被设置,则说明该ShardGroup下的Shard是可以被安全删除的。Shards 是一组Shard列表。下面是Shard下面的概念。

// ShardInfo represents metadata about a shard.
type ShardInfo struct {  
   ID     uint64  
   Owners []ShardOwner  
}

到Shard这一层,我们发现没有什么再往下可以探寻的结构了,Owners虽然是一个ShardOwner的列表,但是ShardOwner表示的是这个Shard的归属的节点,不需要继续向下看了。

到这里 RetentionPolicies 这条线我们已经看到最下层了,我们接下来看ContinuousQueries连续查询。熟悉influxdb的应该可以看出来,ContinuousQueries 应该只是记录一些连续查询的配置,不会有复杂的信息。

// ContinuousQueryInfo represents metadata about a continuous query.
type ContinuousQueryInfo struct {  
   Name  string  
   Query string  
}

到这一步,相信大家对meta最基本的数据结构已经有一个最基本的认识了,我们甚至还可以画一张图。

image.png

第二部分

ShardGroup 创建

我们继续从data目录寻找线索,看一下什么时候会创建一个shardGroup并写进去元信息中呢?果不其然,找到了这样一个方法。

可以看到创建一个shardGroup的时候指定了StartTimeEndTime 可以看到EndTime - StartTime的值就是一个ShardGroupDuration的值。

这个怎么理解呢? 假设我们的某个RP的值是2w (2周),我们指定每一周为一个shardGroup,那当influxdb运行两周之后,元数据的数据结构可能就是这样: Pasted image 20230125222936.png

// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
// 在数据库给定的时间戳上创建一个shardgroup
func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error {  
   // Find retention policy.  
   rpi, err := data.RetentionPolicy(database, policy)  
   if err != nil {  
      return err  
   } else if rpi == nil {  
      return influxdb.ErrRetentionPolicyNotFound(policy)  
   }  
  
   // Verify that shard group doesn't already exist for this timestamp.  
   if rpi.ShardGroupByTimestamp(timestamp) != nil {  
      return nil  
   }  
  
   // Create the shard group.  
   data.MaxShardGroupID++  
   sgi := ShardGroupInfo{}  
   sgi.ID = data.MaxShardGroupID  
   sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()  
   sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()  
   if sgi.EndTime.After(time.Unix(0, models.MaxNanoTime)) {  
      // Shard group range is [start, end) so add one to the max time.  
      sgi.EndTime = time.Unix(0, models.MaxNanoTime+1)  
   }  
  
   data.MaxShardID++  
   sgi.Shards = []ShardInfo{  
      {ID: data.MaxShardID},  
   }  
  
   // Retention policy has a new shard group, so update the policy. Shard  
   // Groups must be stored in sorted order, as other parts of the system   // assume this to be the case.   rpi.ShardGroups = append(rpi.ShardGroups, sgi)  
   sort.Sort(ShardGroupInfos(rpi.ShardGroups))  
  
   return nil  
}

那什么时候会调用CreateShardGroup 进行ShardGroup的创建呢? 根据我们对上图的理解,shardGroup的创建只会在一个ShardGroupDuration 结束的时候创建,比如第二周的数据,那肯定要到第一周过完了才会需要一个新的 ShardGroup, 但是这个ShardGroup创建的时机是系统定期检查自动创建的呢? 还是用到的时候发现没有临时创建出来的呢?

说实话,我不知道。我们只能顺藤摸瓜向上找。我们通过查询调用信息,发现除了单元测试之外,有个地方调用了data模块下的 CreateShardGroup 函数。位于同样属于meta模块下的clinet.go文件中。

这个方法主要是对data模块的调用,可见关于shardGroup以及对元信息模块的调用基本上都是委托给data模块的(influxdb的设计美学,权责还是比较分明的)

func createShardGroup(data *Data, database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {  
   // It is the responsibility of the caller to check if it exists before calling this method.  
   if sg, _ := data.ShardGroupByTimestamp(database, policy, timestamp); sg != nil {  
      return nil, ErrShardGroupExists  
   }  
  
   if err := data.CreateShardGroup(database, policy, timestamp); err != nil {  
      return nil, err  
   }  
  
   rpi, err := data.RetentionPolicy(database, policy)  
   if err != nil {  
      return nil, err  
   } else if rpi == nil {  
      return nil, errors.New("retention policy deleted after shard group created")  
   }  
   // 按照时间排序
   sgi := rpi.ShardGroupByTimestamp(timestamp)  
   return sgi, nil  
}

但是clinet下的createShardGroup 显然还不是我们要找的答案。我们先找下同模块下有没有对这个方法的调用,没有再去其他地方碰碰运气。

运气真好,clinet模块下刚好有我们想要的东西。

这个函数叫预创建ShardGroups,看来我们今天运气不错,我们先大致看下描述。大致意思就是需要在下一个shardgroup写进来之前把shardgroup创建好。

// PrecreateShardGroups creates shard groups whose endtime is before the 'to' time passed in, but// is yet to expire before 'from'. This is to avoid the need for these shards to be created when data  
// for the corresponding time range arrives. Shard creation involves Raft consensus, and precreation  
// avoids taking the hit at write-time.  
func (c *Client) PrecreateShardGroups(from, to time.Time) error {  
   c.mu.Lock()  
   defer c.mu.Unlock()  
   data := c.cacheData.Clone()  
   var changed bool  
   // 遍历所有的database
   for _, di := range data.Databases {  
      for _, rp := range di.RetentionPolicies {  
         if len(rp.ShardGroups) == 0 {  
            // No data was ever written to this group, or all groups have been deleted.  
            continue  
         }  
         g := rp.ShardGroups[len(rp.ShardGroups)-1] // Get the last group in time.  
         if !g.Deleted() && g.EndTime.Before(to) && g.EndTime.After(from) {  
            // Group is not deleted, will end before the future time, but is still yet to expire.  
            // This last check is important, so the system doesn't create shards groups wholly            // in the past.  
            // Create successive shard group.            nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond)  
            // if it already exists, continue  
            if sg, _ := data.ShardGroupByTimestamp(di.Name, rp.Name, nextShardGroupTime); sg != nil {  
               c.logger.Info("Shard group already exists",  
                  logger.ShardGroup(sg.ID),  
                  logger.Database(di.Name),  
                  logger.RetentionPolicy(rp.Name))  
               continue  
            }  
            newGroup, err := createShardGroup(data, di.Name, rp.Name, nextShardGroupTime)  
            if err != nil {  
               c.logger.Info("Failed to precreate successive shard group",  
                  zap.Uint64("group_id", g.ID), zap.Error(err))  
               continue  
            }  
            changed = true  
            c.logger.Info("New shard group successfully precreated",  
               logger.ShardGroup(newGroup.ID),  
               logger.Database(di.Name),  
               logger.RetentionPolicy(rp.Name))  
         }  
      }  
   }  
  
   if changed {  
      if err := c.commit(data); err != nil {  
         return err  
      }  
   }  
  
   return nil  
}

到这一步仍然不是我们想要寻找的东西,但是线索已经尽在眼前了,我们找一下PrecreateShardGroups调用的位置。运气不错,一下子就找到了。

我们最终在/Users/hanshu/Code/go_project/influxdb/services/precreator/service.go 找到了相关的调用,从注释可以看到,influxdb内部维护了一个定时器,周期检查是否需要创建新的shardGroup。至此关于ShardGroup的内容就暂时分析完了。

// 不断检查是否需要预创建
// runPrecreation continually checks if resources need precreation.
func (s *Service) runPrecreation() {  
   defer s.wg.Done()  
  
   for {  
      select {  
      case <-time.After(s.checkInterval):  
         if err := s.precreate(time.Now().UTC()); err != nil {  
            s.Logger.Info("Failed to precreate shards", zap.Error(err))  
         }  
      case <-s.done:  
         s.Logger.Info("Terminating precreation service")  
         return  
      }  
   }  
}  
  
// precreate performs actual resource precreation.
func (s *Service) precreate(now time.Time) error {  
   cutoff := now.Add(s.advancePeriod).UTC()  
   return s.MetaClient.PrecreateShardGroups(now, cutoff)  
}

RP 的创建

相较于ShardGroup的创建,RP的创建要简单许多,RP意为数据保留策略,数据保留策略的创建只会有两个地方会调用。

  • 创建数据库的时候,(如果指定了RP, 创建该RP,如果没有指定,则默认RP,autogen)
  • 通过命令行/http的方式使用influxdb sql 创建新的rp
// CreateRetentionPolicy creates a new retention policy on a database.
// It returns an error if name is blank or if the database does not exist.  
// 为database创建一个新的rp,如果name为空或者数据库不存在将会报错
func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo, makeDefault bool) error {  
   // Validate retention policy. 
   // 检查rpinfo 
   if rpi == nil {  
      return ErrRetentionPolicyRequired  
   } else if rpi.Name == "" {  
      return ErrRetentionPolicyNameRequired  
   } else if len(rpi.Name) > MaxNameLen {  
      return ErrNameTooLong  
   } else if rpi.ReplicaN < 1 {  
      return ErrReplicationFactorTooLow  
   }  
  
   // Normalise ShardDuration before comparing to any existing  
   // retention policies. The client is supposed to do this, but 
     // do it again to verify input.  
     // 归一化ShardGroupDuration到一个合理的范围内
    rpi.ShardGroupDuration = normalisedShardDuration(rpi.ShardGroupDuration, rpi.Duration)  
   // 如果rp的Duration小于ShardGroupDuration 是不合理的,会报错
   if rpi.Duration > 0 && rpi.Duration < rpi.ShardGroupDuration {  
      return ErrIncompatibleDurations  
   }  
  
   // Find database.  
   di := data.Database(database)  
   if di == nil {  
      return influxdb.ErrDatabaseNotFound(database)  
   } else if rp := di.RetentionPolicy(rpi.Name); rp != nil {  
      // RP with that name already exists. Make sure they're the same.  
      if rp.ReplicaN != rpi.ReplicaN || rp.Duration != rpi.Duration || rp.ShardGroupDuration != rpi.ShardGroupDuration {  
         return ErrRetentionPolicyExists  
      }  
      // if they want to make it default, and it's not the default, it's not an identical command so it's an error  
      if makeDefault && di.DefaultRetentionPolicy != rpi.Name {  
         return ErrRetentionPolicyConflict  
      }  
      return nil  
   }  
  
   // Append copy of new policy.  
   di.RetentionPolicies = append(di.RetentionPolicies, *rpi)  
  
   // Set the default if needed  
   if makeDefault {  
      di.DefaultRetentionPolicy = rpi.Name  
   }  
  
   return nil  
}

RP的创建位置

接下来我们截取两个代码片段去看一下RP具体是在什么位置进行创建的。

在创建数据库的时候顺便创建

// CreateDatabase creates a database or returns it if it already exists.
 func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {  
   c.mu.Lock()  
   defer c.mu.Unlock()  
  
   data := c.cacheData.Clone()  
  
   if db := data.Database(name); db != nil {  
      return db, nil  
   }  
  
   if err := data.CreateDatabase(name); err != nil {  
      return nil, err  
   }  

   // create default retention policy  
   if c.retentionAutoCreate {  
      rpi := DefaultRetentionPolicyInfo()  
      if err := data.CreateRetentionPolicy(name, rpi, true); err != nil {  
         return nil, err  
      }  
   }  
  
   db := data.Database(name)  
  
   if err := c.commit(data); err != nil {  
      return nil, err  
   }  
  
   return db, nil  
}

使用influx sql在命令行创建

func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement) error {  
   if !meta.ValidName(stmt.Name) {  
      // TODO This should probably be in `(*meta.Data).CreateRetentionPolicy`  
      // but can't go there until 1.1 is used everywhere  
      return meta.ErrInvalidName  
   }  
  
   spec := meta.RetentionPolicySpec{  
      Name:               stmt.Name,  
      Duration:           &stmt.Duration,  
      ReplicaN:           &stmt.Replication,  
      ShardGroupDuration: stmt.ShardGroupDuration,  
   }  
  
   // Create new retention policy.  
   _, err := e.MetaClient.CreateRetentionPolicy(stmt.Database, &spec, stmt.Default)  
   return err  
}