掘金 阅读 ( ) • 2024-04-27 18:47

本次实验使用了sklearn中的数据s曲线,示意图如下:

image.png

在扩散模型中,需要定义在每一步所添加的噪声的β值,即其标准差。 让每一步的β呈递增状态,且范围规范至0到1之间

#制定每一步的β
betas=torch.linspace(-6,6,num_steps)
betas=torch.sigmoid(betas)*(0.5e-2 - 1e-5)+1e-5

很关键的一步:

image.png

就是扩散过程中任意时刻t相应的采样结果xt均可由x0和βt表示出来。

构造出相应的各参数(换元主要是方便看)

# 计算alpha各参数值
alphas=1-betas
# 连乘
alphas_prod=torch.cumprod(alphas,0)
alphas_bar_sqrt=torch.sqrt(alphas_prod)
one_minus_alphas_bar_log=torch.log(1-alphas_prod)
one_minus_alphas_bar_sqrt=torch.sqrt(1-alphas_prod)

进一步,写出q_x的表达式。这里使用了重参数技巧。

  • 就是将服从标准正态分布的一个值映射至服从任意正态分布的值,为了保证梯度的传输。
def q_x(x_0,t):
    # 生成服从正态分布的噪声
    noise=torch.randn_like(x_0)
    # 均值
    alphas_t=alphas_bar_sqrt[t]
    # 标准差
    alphas_l_m_t=one_minus_alphas_bar_sqrt[t]
    # 在x_0的基础上添加噪声
    return alphas_t * x_0 + alphas_l_m_t * noise 

然后演示一下,对原始s曲线加噪的图像效果。原本还是像样子的,后面就变成一团了。

image.png

下一步:构建逆扩散过程的高斯分布模型。

  • 这里用神经网络去拟合(以免逐步迭代增加计算量)。
  • 在这里写了一个简单的MLP对该过程进行拟合。

逆推过程,由xt推测xt-1时的状态。

image.png

网络训练目标

image.png image.png

image.png

训练过程中尽可能使μθ接近μt_hat。

  • Embedding(嵌入)在机器学习和自然语言处理领域中是一个常见的概念,通常指的是将数据映射到一个低维度的向量空间的过程。嵌入层的目标是将每个离散的项目(比如一个单词)映射到一个具有固定维度的实数向量,这个向量表示了项目在一个抽象空间中的位置。
# 保证最终的输入、输出维度一样
self.linears = nn.ModuleList([nn.Linear(2, num_units),
                              nn.ReLU(),
                              nn.Linear(num_units, num_units),
                              nn.ReLU(),
                              nn.Linear(num_units, num_units),
                              nn.ReLU(),
                              nn.Linear(num_units, 2)]
                             )
# 这些嵌入层将时间步 t 映射到与数据维度相同的空间中,以便与数据 x 进行结合。
self.step_embeddings = nn.ModuleList([
    nn.Embedding(n_steps, num_units),
    nn.Embedding(n_steps, num_units),
    nn.Embedding(n_steps, num_units)
])

# 由于采样时是随机的,需要额外的时间信息 t_embed
def forward(self, x_0, t):
    x = x_0
    for idx, embedding_layer in enumerate(self.step_embeddings):
        t_embed = embedding_layer(t)
        x = self.linears[2 * idx](x)
        # 这里嵌入时间信息,采用了直接加进去的方法,再通过一个线性层
        x += t_embed
        x = self.linears[2 * idx + 1](x)
    # 最后,将经过多次线性变换和时间嵌入的数据 x 通过 self.linears[-1] 进行最终的线性变换,输出模型的结果。
    x = self.linears[-1](x)
    return x

接着,需要编写训练损失函数。

def diffusion_loss_fn(model, x_0, alphas_bar_sqrt, one_minus_alphas_bar_sqrt, n_steps):
    """对任意时刻t进行采样,并计算loss"""
    batch_size = x_0.shape[0]

    # 对一个batchsize样本生成随机的时刻t,并且分成两段,尽可能覆盖到更多的t
    t = torch.randint(0, n_steps, (batch_size // 2,))
    # 拼接张量
    t = torch.cat([t, n_steps - 1 - t], dim=0)
    # 在张量的最后一个维度上增加一个维度。
    t = t.unsqueeze(-1)

    # x0的系数
    a = alphas_bar_sqrt[t]

    # eps的系数
    aml = one_minus_alphas_bar_sqrt[t]

    # 生成随机噪声
    e = torch.randn_like(x_0)

    # 构造模型的输入
    x = x_0 * a + e * aml

    # 送入模型,得到t时刻的随机噪声的预测值
    output = model(x, t.squeeze(-1))

    # 与真实噪声一起计算误差,求平均值
    loss = (e - output).square().mean()

    return loss

其实根据论文中的公式写就好了。对任意时刻t的loss计算,需要考虑初始状态x_0以及当前的噪声e。这两者相应的系数是随时间变化的(但是是预设的)。

生成包含时间步信息的张量 t的过程中,将总步骤n_steps分成了两段,将随机生成的一半长度的时间步与通过计算得到的另一半长度的时间步进行拼接。 image.png

由xt和t,模型预测该步添加的噪声。(去噪过程其实也有一定的随机性) image.png

# 逆扩散采样函数
def p_sample(model, x, t, betas, one_minus_alphas_bar_sqrt):
    # 从x[T]采样t时刻的重构值
    t = torch.tensor([t])

    coeff = betas[t] / one_minus_alphas_bar_sqrt[t]

    eps_theta = model(x, t)
    # 这里的均值是作者假设的
    mean = (1 - (1 - betas[t]).sqrt()) * (x - coeff * eps_theta)

    z = torch.randn_like(x)

    sigma_t = betas[t].sqrt()
    # 将标准正态分布转换成采样出来的正态分布
    sample = mean + z * sigma_t

    return sample



def p_sample_loop(model, shape, n_steps, betas, one_minus_alphas_bar_sqrt):
    """从x[T]恢复x[T-1]/X[T-2]/...X[0]"""

    cur_x = torch.randn(shape)
    # 创建一个列表
    x_seq = [cur_x]
    for i in reversed(range(n_steps)):
        cur_x = p_sample(model, cur_x, i, betas, one_minus_alphas_bar_sqrt)
        x_seq.append(cur_x)

    return x_seq

训练过程如下:


batch_size = 128
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
num_epochs = 4000
plt.rc('text', color='blue')

# 100步
model = MLP_Diffusion(num_steps)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

for t in range(num_epochs):
    for idx, batch_x in enumerate(dataloader):

        loss = diffusion_loss_fn(model, batch_x, alphas_bar_sqrt, one_minus_alphas_bar_sqrt, num_steps)
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(),1.)
        optimizer.step()

    # 每隔100步展示一下逆过程效果
    if t % 100 == 0:
        print(loss.data)
        x_seq = p_sample_loop(model, dataset.shape, num_steps, betas, one_minus_alphas_bar_sqrt)
        fig, axs = plt.subplots(1, 10, figsize=(28, 3))
        for i in range(1, 11):
            cur_x = x_seq[i * 10].detach()
            axs[i - 1].scatter(cur_x[:, 0], cur_x[:, 1], color='red', edgecolor='white')
            axs[i - 1].set_axis_off()
            axs[i - 1].set_title('$q(\mathbf{x}_{' + str(i*10) + '})$')
        plt.show()

中间过程效果图:

image.png

迭代的其实还是有点慢的。

第1000次的效果:其实已经有点s形状了

image.png

第2000次:

image.png

第3000次:

image.png

第4000次:

image.png