掘金 后端 ( ) • 2024-04-24 10:40

ShardingCore是一款efcore下高性能、轻量级针对分表分库读写分离的框架。正好多租户需要用到分库分表,因此选择了这个框架,其官方文档提供了详细的多租户教程,我实现的过程中绝大部分照搬了这个教程。最后架子搭起来,写个文章记录一下,也让自己理解的深入一些。

1. 多租户

多租户主要有三种实现的方式,多数据库,单数据库多架构(Schema),单数据库单架构。简单来说就是不同的租户的数据是存在不同的数据库,或者一个数据库的不同架构,或者仅仅以表中的某个字段来区分,由此可以很容易的理清三种方案在成本、编写逻辑、维护、数据备份与恢复等方面的优劣势。

因为ShardingCore并没有直接提供分架构的用法,我又想采取折中方案,即不同的租户的数据存储在相同数据库的不同的Schema中,所以我的设想是使用ShardingCore分库的方法来实现分架构,然后我失败了。这里需要简单介绍一下我的数据库,项目选用的是PostgreSQL,因为要存储地理数据,因此数据库加装了postgis扩展,项目中引用了Npgsql.EntityFrameworkCore.PostgreSQL.NetTopologySuite库。这时候问题来了,PostgreSQL数据库中postgis扩展只能属于一个架构,没法所有架构都用,而我每个租户有各自的地理数据,因此没法采用这种方案,最终选择了成本最高,隔离性和安全性最好的多数据库的方案。

2. 租户与用户

这部分是我与官方教程比较大的区别。我的想法是租户是租户,用户是用户,一个租户可以有很多个用户,租户的管理员只是租户用户中的一个,因此,租户存在主数据库中,用户存在租户数据库中。

主数据库的DbContext

public class DefaultDbContext: DbContext
{
    public DefaultDbContext(DbContextOptions<DefaultDbContext> options) : base(options)
    {
    }

    protected override void OnConfiguring(DbContextOptionsBuilder builder)
    {
        base.OnConfiguring(builder);
    }

    /// <summary>
    /// 租户表
    /// </summary>
    public DbSet<Tenant> Tenants { get; set; }

    protected override void OnModelCreating(ModelBuilder builder)
    {
        base.OnModelCreating(builder);
        string assemblyName = AssemblyName.GetAssemblyName(Assembly.GetExecutingAssembly().Location).FullName;
        Assembly assembly = Assembly.Load(assemblyName);
        foreach (var type in assembly.GetTypes())
        {
            var baseclass = type.BaseType;
            if (baseclass == typeof(EntityBase))
            {
                //种子数据
                MethodInfo method = type.GetMethod("HasData");
                if (method != null)
                {
                    var data = (IEnumerable<object>)method.Invoke(null, null);
                    builder.Entity(type.FullName).HasData(data);
                }
                else
                {
                    builder.Entity(type.FullName);
                }
            }
        }
    }
}

租户数据库的DbContext

public class TenantDbContext: AbstractShardingDbContext
{
    public TenantDbContext(DbContextOptions<TenantDbContext> options) : base(options)
    {
    }
    
    /// <summary>
    /// 用户表
    /// </summary>
    public DbSet<User> Users { get; set; }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        base.OnModelCreating(modelBuilder);
        string assemblyName = AssemblyName.GetAssemblyName(Assembly.GetExecutingAssembly().Location).FullName;
        Assembly assembly = Assembly.Load(assemblyName);
        foreach (var type in assembly.GetTypes())
        {
            var baseclass = type.BaseType;
            if (baseclass == typeof(TenantEntityBase))
            {
                //种子数据
                MethodInfo method = type.GetMethod("HasData");
                if (method != null)
                {
                    var data = (IEnumerable<object>)method.Invoke(null, null);
                    modelBuilder.Entity(type.FullName).HasData(data);
                }
                else
                {
                    modelBuilder.Entity(type.FullName);
                }
            }
        }
    }

这里TenantDbContext继承自AbstractShardingDbContext,是ShardingCore分库的配置。

我的想法是系统的超级管理员也只是系统中一个特别的租户而已,也就是这个租户可以管理其他租户,类似于二房东,也住这栋楼里。所以超级管理员登录,跟一个普通租户的普通用户登录应该没有什么区别,因此我额外加了一个常量,来记录这个二房东的ID。

/// <summary>
/// 租户相关的常量
/// </summary>
public static class TenantConstant
{
    /// <summary>
    /// 系统租户ID
    /// </summary>
    public const long SYSTEM_TENANT_ID = 1;
}

3. 租户与DbContext

每个租户是不同的数据库,有不同的连接字符串,因此不能如下直接注入,得对每个租户分别注入TenantDbContext。

builder.Services.AddDbContext<DefaultDbContext>(options => {
    options.UseNpgsql(builder.Configuration.GetConnectionString("NpgContext")); 
});

因为使用了ShardingCore,他允许通过IShardingRuntimeContext来进行注入。首先构造一个租户与注入参数的中间类:

/// <summary>
/// 租户分库配置
/// </summary>
public class ShardingTenantOptions
{
    /// <summary>
    /// 默认数据源名称
    /// </summary>
    public string DefaultDataSourceName { get; set; }
    /// <summary>
    /// 默认数据库地址
    /// </summary>
    public string DefaultConnectionString { get; set; }
    /// <summary>
    /// 分片迁移的命名空间关键字
    /// </summary>
    public string MigrationNamespace { get; set; }
}

创建一个IShardingRuntimeContext构建器

public class ShardingBuilder : IShardingBuilder
{
    public static readonly ILoggerFactory efLogger = LoggerFactory.Create(builder =>
    {
        builder.AddFilter((category, level) =>
            category == DbLoggerCategory.Database.Command.Name && level == LogLevel.Information).AddConsole();
    });
    private readonly IServiceProvider _serviceProvider;
    private readonly IConfiguration _configuration;

    public ShardingBuilder(IServiceProvider serviceProvider, IConfiguration configuration)
    {
        _serviceProvider = serviceProvider;
        _configuration = configuration;
    }
    public IShardingRuntimeContext Build(ShardingTenantOptions tenantOptions)
    {
        var shardingRuntimeBuilder = new ShardingRuntimeBuilder<TenantDbContext>()
            .UseRouteConfig(o =>
            {
                //只有超级管理员租户才需要用到ShardingCore的分库特性,其他租户只需要用到自己的数据库
                if (tenantOptions.DefaultDataSourceName == $"tenant_{TenantConstant.SYSTEM_TENANT_ID}")
                {
                    o.AddTenantDataSourceRoute();
                }
            })
            .UseConfig(o =>
            {
                o.ThrowIfQueryRouteNotMatch = false;
                o.UseShardingQuery((conStr, builder) =>
                {
                    builder.UseNpgsql(conStr, b => b.UseNetTopologySuite(geographyAsDefault: true))
                            .UseMigrationNamespace(new NpqsqlMigrationNamespace(tenantOptions.MigrationNamespace));
                    builder.UseLoggerFactory(efLogger)
                        .UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking)
                        .ReplaceService<IMigrationsAssembly, MultiDatabaseMigrationsAssembly>();
                });
                o.UseShardingTransaction((connection, builder) =>
                {
                    builder
                            .UseNpgsql(connection, b => b.UseNetTopologySuite(geographyAsDefault: true));
                    builder.UseLoggerFactory(efLogger)
                        .UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking);
                });
                o.AddDefaultDataSource(tenantOptions.DefaultDataSourceName, tenantOptions.DefaultConnectionString);
                ////超级管理员租户需要用到ShardingCore的分库特性,需要添加额外数据库
                if (tenantOptions.DefaultDataSourceName == $"tenant_{TenantConstant.SYSTEM_TENANT_ID}")
                {
                    using (var scope = _serviceProvider.CreateScope())
                    {
                        var defaultDbContext = scope.ServiceProvider.GetRequiredService<DefaultDbContext>();
                        var tenantIds = defaultDbContext.Tenants.Where(t => t.Id != TenantConstant.SYSTEM_TENANT_ID).Select(t => t.Id).ToList();
                        var connectionStr = _configuration["ConnectionStrings:NpgContext"];
                        var baseDbName = _configuration["Tenant:BaseDb"];
                        var dsource = new Dictionary<string, string>();
                        foreach (var tid in tenantIds)
                        {
                            var dbName = _configuration["Tenant:TenantDbTemp"].Replace("{tenant}", $"Tenant{tid}");
                            dsource.Add($"tenant_{tid}", connectionStr.Replace(baseDbName, dbName));
                        }
                        o.AddExtraDataSource(sp =>
                        {
                            return dsource;
                        });
                    }
                }
                //注意这个迁移必须要十分重要
                o.UseShardingMigrationConfigure(b =>
                {
                    b.ReplaceService<IMigrationsSqlGenerator, ShardingNpsqlMigrationsSqlGenerator>();
                });
            }).AddServiceConfigure(s =>
            {
                //IShardingRuntimeContext内部的依赖注入
                s.AddSingleton(tenantOptions);
            });

        shardingRuntimeBuilder.ReplaceService<ITableEnsureManager, PostgreSqlTableEnsureManager>(ServiceLifetime.Singleton);
        return shardingRuntimeBuilder.Build(_serviceProvider);
    }
}

代码中有大量关于迁移的内容以及超级管理员租户专用的分库配置,下面详细说明。用起来通常是通过租户信息实例化一个ShardingTenantOptions,用来Build一个IShardingRuntimeContext,通过租户管理器与租户一一对应存下来。

var shardingRuntimeContext = _shardingBuilder.Build(new ShardingTenantOptions() {
    DefaultDataSourceName = $"tenant_{tenant.Id}",
    DefaultConnectionString = connectionStr,
    MigrationNamespace = $"T{tenant.Id}",
});

_tenantManager.AddTenantSharding(tenant.Id, shardingRuntimeContext);

最后通过IShardingRuntimeContext来一一注入TenantDbContext

builder.Services.AddDbContext<TenantDbContext>((sp, b) =>
{
    var tenantManager = sp.GetRequiredService<ITenantManager>();
    var currentTenantContext = tenantManager.GetCurrentTenantContext();
    //如果有上下文那么创建租户dbcontext
    if (currentTenantContext != null)
    {
        var shardingRuntimeContext = currentTenantContext.GetShardingRuntimeContext();
        b.UseDefaultSharding<TenantDbContext>(shardingRuntimeContext);
    }
});

4. 租户管理器

上面提到了租户管理器将租户和租户的DbContext(IShardingRuntimeContext)管理起来,这是租户管理器的核心功能,为了实现更灵活的管理和解耦,对管理做了如下设计:

  1. 租户管理器管理租户域
public interface ITenantManager
{
    /// <summary>
    /// 获取所有的租户
    /// </summary>
    /// <returns></returns>
    List<long> GetAll();
    /// <summary>
    /// 获取当前租户
    /// </summary>
    /// <returns></returns>
    TenantContext GetCurrentTenantContext();
    /// <summary>
    /// 添加租户信息
    /// </summary>
    /// <param name="tenantId"></param>
    /// <param name="shardingRuntimeContext"></param>
    /// <returns></returns>
    bool AddTenantSharding(long tenantId, IShardingRuntimeContext shardingRuntimeContext);

    /// <summary>
    /// 创建租户环境
    /// </summary>
    /// <param name="tenantId"></param>
    /// <returns></returns>
    TenantScope CreateScope(long tenantId);
}

public class TenantManager:ITenantManager
{
    private readonly ITenantContextAccessor _tenantContextAccessor;
    private readonly ConcurrentDictionary<string, IShardingRuntimeContext> _cache = new();

    public TenantManager(ITenantContextAccessor tenantContextAccessor)
    {
        _tenantContextAccessor = tenantContextAccessor;
    }

    public List<long> GetAll()
    {
        return _cache.Keys.Select(k=>Convert.ToInt64(k.Replace("tenant_",""))).ToList();
    }

    public TenantContext GetCurrentTenantContext()
    {
        return _tenantContextAccessor.TenantContext;
    }

    public bool AddTenantSharding(long tenantId, IShardingRuntimeContext shardingRuntimeContext)
    {
        return _cache.TryAdd($"tenant_{tenantId}", shardingRuntimeContext);
    }

    public TenantScope CreateScope(long tenantId)
    {
        if (!_cache.TryGetValue($"tenant_{tenantId}", out var shardingRuntimeContext))
        {
            throw new InvalidOperationException("未找到对应租户的配置");
        }

        _tenantContextAccessor.TenantContext = new TenantContext(shardingRuntimeContext, tenantId);
        return new TenantScope(_tenantContextAccessor);
    }
}
  1. 租户域中通过租户上下文定位器管理租户上下文
public class TenantScope: IDisposable
{
    public TenantScope(ITenantContextAccessor tenantContextAccessor)
    {
        TenantContextAccessor = tenantContextAccessor;
    }

    public ITenantContextAccessor TenantContextAccessor { get; }

    public void Dispose()
    {
        TenantContextAccessor.TenantContext = null;
    }
}

public interface ITenantContextAccessor
{
    TenantContext? TenantContext { get; set; }
}

public class TenantContextAccessor : ITenantContextAccessor
{
    private static readonly AsyncLocal<TenantContext?> _tenantContext = new AsyncLocal<TenantContext?>();
    public TenantContext? TenantContext
    {
        get => _tenantContext.Value;
        set => _tenantContext.Value = value;
    }
}
  1. 租户上下文中存储ShardingRuntimeContext
public class TenantContext
{
    private readonly IShardingRuntimeContext _shardingRuntimeContext;
    private readonly long _tenantId;

    public TenantContext(IShardingRuntimeContext shardingRuntimeContext, long tenantId)
    {
        _shardingRuntimeContext = shardingRuntimeContext;
        _tenantId = tenantId;
    }
    public IShardingRuntimeContext GetShardingRuntimeContext()
    {
        return _shardingRuntimeContext;
    }

    public long GetTenantId() { return _tenantId;}
}

并实现以下业务:

  1. 系统初始化时在管理器中加入所有可用租户上下文
  2. 添加租户时在管理器中添加新的租户上下文
  3. 某个租户登录后请求系统时通过租户管理去创建域确保该租户所有的请求都在该域下进行,访问其能访问的数据库
  4. 部分需要全库的操作比如登陆验证,租户管理器可以手动创建能访问全部数据库管理员租户域
  5. 删除或者禁用租户时在管理器中删除

5. Code-First 迁移

原生EntityFramework Core框架不支持一个DBContext来迁移多个数据库,但ShardingCore的作者大佬做了这样的方案,并分享了出来,我基本是照搬他的博客的。唯一的不同是大佬的数据库是固定的几个,我这边做了个根据租户ID来确定数据库名和连接字符串的简单方案。

var dbName = configuration["Tenant:TenantDbTemp"].Replace("{tenant}", $"Tenant{tenant.Id}");
var option = new ShardingTenantOptions()
{
     DefaultDataSourceName = $"tenant_{tenant.Id}",
     DefaultConnectionString = connectionStr.Replace(baseDbName,dbName),
     MigrationNamespace = $"T{tenant.Id}",
};
var shardingRuntimeContext = shardingBuilder.Build(option);
tenantManager.AddTenantSharding(tenant.Id, shardingRuntimeContext);

//创建数据库
string sql = $"SELECT EXISTS(SELECT 1 FROM pg_catalog.pg_database u where u.datname='{dbName}')";
bool exist = Convert.ToBoolean(defaultDbContext.Database.ExecuteScalar(sql));

if (!exist)
{
     string tsql = $"CREATE DATABASE \"{dbName}\" WITH OWNER = postgres TEMPLATE = postgis ENCODING = 'UTF8' LOCALE_PROVIDER = 'libc' CONNECTION LIMIT = -1 IS_TEMPLATE = False;";
     defaultDbContext.Database.ExecuteSqlRaw(tsql);
}

using (tenantManager.CreateScope(tenantId))
using (var scope = serviceProvider.CreateScope())
{
    var tenantDbContext = scope.ServiceProvider.GetService<TenantDbContext>();
    if (tenantDbContext.Database.GetPendingMigrations().Any())
    {
        tenantDbContext.Database.Migrate();
    }
}

这里有几个问题需要说明以下。第一是DbContext.Database.Migrate()或者DbContext.Database.EnsureCreated()是能够自动创建数据库的,但是我需要存储空间数据,因此需要确保TEMPLATE = postgis,所以选择了手动创建。第二,DbContext.Database.Migrate()能够生效的前提是先手动Add-Migration,因此需要定期维护,比如提前知道大概有一百个租户,就得先提前加一百个迁移配置。

Add-Migration v0.0.1 -Context TenantDbContext -OutputDir Migrations\T1 -Args "--tenant 1"

与之匹配的,也需要在代码里将传入的参数与数据库和迁移配置代码对应起来。

var tenant = builder.Configuration.GetValue("Tenant", "1");
builder.Services.AddDbContext<TenantDbContext>((sp, b) =>
{
    var tenantManager = sp.GetRequiredService<ITenantManager>();
    var currentTenantContext = tenantManager.GetCurrentTenantContext();
    //如果有上下文那么创建租户dbcontext
    if (currentTenantContext != null)
    {
        var shardingRuntimeContext = currentTenantContext.GetShardingRuntimeContext();
        b.UseDefaultSharding<TenantDbContext>(shardingRuntimeContext);
    }
    //用来Add-Migration的代码
    if (args != null && args.Length > 0 && !string.IsNullOrEmpty(args[0]))
    {
        var connectionStr = builder.Configuration.GetConnectionString("NpgContext");
        var baseDbName = builder.Configuration["Tenant:BaseDb"];
        var dbName = builder.Configuration["Tenant:TenantDbTemp"].Replace("{tenant}", $"Tenant{tenant}");
        b.UseNpgsql(connectionStr.Replace(baseDbName, dbName), b => b.UseNetTopologySuite(geographyAsDefault: true))
                .UseMigrationNamespace(new NpqsqlMigrationNamespace($"T{tenant}"))
                .ReplaceService<IMigrationsAssembly, MultiDatabaseMigrationsAssembly>();
    }
});

6. 联库操作

上面说到,根据上述配置,普通租户只能访问他自己的数据库,但管理员或者登录接口需要能够访问所有的数据库,因此在某些情况下需要联库操作。好在联库操作是ShardingCore的天赋申通,只需要添加数据源(见本文第三部分)并配置路由就行。

public class VirtualDataSourceRoute<TEntity>: AbstractShardingOperatorVirtualDataSourceRoute<TEntity,string> where TEntity : TenantEntityBase
{
    private readonly DefaultDbContext _defaultDbContext;


    private readonly List<string> _dataSources = new List<string>()
    {
        
    };

    public override List<string> GetAllDataSourceNames()
    {
        return _dataSources;
    }

    public override bool AddDataSourceName(string dataSourceName)
    {
        if (_dataSources.Any(o => o == dataSourceName))
            return false;
        _dataSources.Add(dataSourceName);
        return true;
    }

    //我们设置区域就是数据库
    public override string ShardingKeyToDataSourceName(object shardingKey)
    {
        return $"tenant_{shardingKey}";
    }

    public override Func<string, bool> GetRouteToFilter(string shardingKey, ShardingOperatorEnum shardingOperator)
    {
        var t = ShardingKeyToDataSourceName(shardingKey);
        switch (shardingOperator)
        {
            case ShardingOperatorEnum.Equal: return tail => tail == t;
            default:
                {
                    return tail => true;
                }
        }
    }

    public override void Configure(EntityMetadataDataSourceBuilder<TEntity> builder)
    {
        builder.ShardingProperty(o => o.TenantId);
    }
}

另外每次系统启动时,以及新加入租户时,都需要给系统租户的IShardingRuntimeContext添加额外的数据源

//系统启动时
using (tenantManager.CreateScope(TenantConstant.SYSTEM_TENANT_ID))
{
    using (var scope = serviceProvider.CreateScope())
    {
        var shardingRuntimeContext = tenantManager.GetCurrentTenantContext().GetShardingRuntimeContext();
        foreach (var tenantId in tenantIds)
        {
            if (tenantId != TenantConstant.SYSTEM_TENANT_ID)
            {
                shardingRuntimeContext.GetDataSourceRouteManager().AddDataSourceName($"tenant_{tenantId}");
            }
        }
    }
}

//新增租户时
using (_tenantManager.CreateScope(TenantConstant.SYSTEM_TENANT_ID))
{
    using (var scope = _serviceProvider.CreateScope())
    {
        var sysShardingRuntimeContext = _tenantManager.GetCurrentTenantContext().GetShardingRuntimeContext();
        shardingRuntimeContext.GetDataSourceRouteManager().AddDataSourceName($"tenant_{tenant.Id}");
    }
}

7. 登录与验证

这部分就简单了,主要是生成JWT令牌组装Claims时将租户ID组装进去以及创建一个中间件,验证令牌以及租户ID后创建租户域。

/// <summary>
///组装Claims
/// </summary>
/// <param name="user"></param>
/// <returns></returns>
public static List<Claim> AddClaims(TokenModel user)
{
    var claims = new List<Claim>()
        {
            new(ClaimTypes.PrimarySid, user.UserId.ToString()),
            new(ClaimTypes.NameIdentifier, user.Phone.ToString()),
            new(ClaimTypes.Name, user.UserName),
            new(ClaimTypes.PrimaryGroupSid, user.TenantId.ToString()),
            new(ClaimTypes.GroupSid, user.DepartmentId.ToString()),
            new(ClaimTypes.UserData, JsonConvert.SerializeObject(user))
        };

    return claims;
}


public class TenantSelectMiddleware
{
    private readonly RequestDelegate _next;
    private readonly ITenantManager _tenantManager;

    public TenantSelectMiddleware(RequestDelegate next, ITenantManager tenantManager)
    {
        _next = next;
        _tenantManager = tenantManager;
    }

    /// <summary>
    /// 1.中间件的方法必须叫Invoke,且为public,非static。
    /// 2.Invoke方法第一个参数必须是HttpContext类型。
    /// 3.Invoke方法必须返回Task。
    /// 4.Invoke方法可以有多个参数,除HttpContext外其它参数会尝试从依赖注入容器中获取。
    /// 5.Invoke方法不能有重载。
    /// </summary>
    /// Author : Napoleon
    /// Created : 2020/1/30 21:30
    public async Task Invoke(HttpContext context)
    {

        if (context.Request.Path.ToString().StartsWith("/api/tenant", StringComparison.CurrentCultureIgnoreCase))
        {
            if (!context.User.Identity.IsAuthenticated)
            {
                await _next(context);
                return;
            }

            var tenantIdStr = context.User.Claims.FirstOrDefault((o) => o.Type == ClaimTypes.PrimaryGroupSid)?.Value;
            if (string.IsNullOrWhiteSpace(tenantIdStr))
            {
                context.Response.StatusCode = 403;
                await context.Response.WriteAsync("租户不正确");
                return;
            }
            var tenantId = Convert.ToInt64(tenantIdStr);
            using (_tenantManager.CreateScope(tenantId))
            {
                await _next(context);
            }
        }
        else
        {
            await _next(context);
        }
    }
}

8. 总结

感谢ShardingCore,感谢薛家明大佬,让我在拒绝Furion后可以这么方便快捷的实现多租户。