求助,transformer预测值相同

code_text
```import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler


class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000, dropout=0.2):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)


class TransformerRegressionModel(nn.Module):
    def __init__(self, input_dim, num_heads, num_layers, output_dim, dropout=0.25):
        super(TransformerRegressionModel, self).__init__()
        self.pos_encoder = PositionalEncoding(input_dim, dropout=dropout)
        encoder_layer = nn.TransformerEncoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
        decoder_layer = nn.TransformerDecoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout)
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers)
        self.output_layer = nn.Linear(input_dim, output_dim)
        self._init_weights()

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.TransformerEncoderLayer) or isinstance(m, nn.TransformerDecoderLayer):
                for param in m.parameters():
                    if param.dim() > 1:
                        nn.init.xavier_uniform_(param)

    def forward(self, src, tgt):
        src = self.pos_encoder(src)  # (sequence_length, batch_size, input_dim)
        tgt = self.pos_encoder(tgt)  # (sequence_length, batch_size, input_dim)

        memory = self.transformer_encoder(src)  # (sequence_length, batch_size, input_dim)
        out = self.transformer_decoder(tgt, memory)  # (sequence_length, batch_size, input_dim)
        out = self.output_layer(out)  # 输出层
        return out


# 获取数据
file_path = "C:\\python\\soc_model\\train_data\\train1"
data = pd.read_csv(file_path)

# 特征和目标
x = data[['Current_(mA)', 'Voltage_(V)']].values
scaler = StandardScaler()
y_true = data['Capacity_(Ah)'].values
x_normalized = scaler.fit_transform(np.abs(x))  # Normalize the feature data

# Convert to PyTorch tensors
x_tensor = torch.tensor(x_normalized, dtype=torch.float32)
y_tensor = torch.tensor(y_true, dtype=torch.float32).view(-1, 1)

# 创建数据集
train_dataset = TensorDataset(x_tensor, y_tensor)

# 创建数据加载器
train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=False)

# 超参数配置
input_dim = 2  # 输入特征数量
output_dim = 1  # 输出特征数量
num_heads = 2  # 注意力头数量
num_layers = 6
num_epochs = 100

# 检查 GPU 可用性
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 初始化模型
model = TransformerRegressionModel(input_dim, num_heads, num_layers, output_dim).to(device)

# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.000001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5, mode='min')

# 提前停止策略
patience = 5
best_loss = float('inf')
patience_counter = 0

# 训练模型
model.train()
for epoch in range(num_epochs):
    epoch_loss = 0.0
    previous_y = None  # 保存上一时刻的预测值
    predicted = []
    for i, (current_x, current_y) in enumerate(train_dataloader):

        current_x, current_y = current_x.to(device), current_y.to(device)

        # 如果需要,可以调整 current_x 的维度
        current_x = current_x.unsqueeze(0)  # (1, batch_size, input_dim)

        if i == 0:
            # 第一个时间步,初始化目标序列 tgt 为 0
            tgt = torch.zeros_like(current_y).unsqueeze(0).to(device)  # (1, batch_size, input_dim)
        else:
            # 使用上一时刻的预测值作为目标序列
            tgt = previous_y.unsqueeze(0)


        # 使用当前时刻的输入和上一时刻的输出进行预测
        predicted_y = model(current_x, tgt)
        print(predicted_y)
        predicted.append(predicted_y)



        # 保存当前时刻的预测值以供下一时刻使用
        previous_y = predicted_y.squeeze(0).detach()

        # 计算损失
        loss = criterion(predicted_y, current_y.unsqueeze(0))
        epoch_loss += loss.item()

        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    scheduler.step(epoch_loss)

    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}")

    if epoch_loss < best_loss:
        best_loss = epoch_loss
        patience_counter = 0
        torch.save(model.state_dict(), "transformer_model.pth1")
        print("Model saved.")
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("Early stopping triggered.")
            break

code_text

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler


class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000, dropout=0.2):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)


class TransformerRegressionModel(nn.Module):
    def __init__(self, input_dim, num_heads, num_layers, output_dim, dropout=0.25):
        super(TransformerRegressionModel, self).__init__()
        self.pos_encoder = PositionalEncoding(input_dim, dropout=dropout)
        encoder_layer = nn.TransformerEncoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
        decoder_layer = nn.TransformerDecoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout)
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers)
        self.output_layer = nn.Linear(input_dim, output_dim)
        self._init_weights()

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.TransformerEncoderLayer) or isinstance(m, nn.TransformerDecoderLayer):
                for param in m.parameters():
                    if param.dim() > 1:
                        nn.init.xavier_uniform_(param)

    def forward(self, src, tgt):
        src = self.pos_encoder(src)  # (sequence_length, batch_size, input_dim)
        tgt = self.pos_encoder(tgt)  # (sequence_length, batch_size, input_dim)

        memory = self.transformer_encoder(src)  # (sequence_length, batch_size, input_dim)
        out = self.transformer_decoder(tgt, memory)  # (sequence_length, batch_size, input_dim)
        out = self.output_layer(out)  # 输出层
        return out


# 获取数据
file_path = "C:\\python\\soc_model\\train_data\\train5"
data = pd.read_csv(file_path)

# 特征和目标
x = data[['Current_(mA)', 'Voltage_(V)']].values
scaler_x = StandardScaler()
x_normalized = scaler_x.fit_transform(x)

x = np.abs(x)

y_true = data['Capacity_(Ah)'].values

# 将数据转换为 PyTorch 张量
x_tensor = torch.tensor(x_normalized, dtype=torch.float32)


# 创建数据集
test_dataset = TensorDataset(x_tensor)

# 创建数据加载器
test_data = DataLoader(test_dataset, batch_size=1, shuffle=False)

# 超参数配置
input_dim = 2  # 输入特征数量
output_dim = 1  # 输出特征数量
num_heads = 2  # 注意力头数量
num_layers = 6
num_epochs = 100

# 检查 GPU 可用性
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 初始化模型
model = TransformerRegressionModel(input_dim, num_heads, num_layers, output_dim).to(device)
model.load_state_dict(torch.load('transformer_model.pth1'))





# 预测流程
# 推理
predicted = []

with torch.no_grad():
    for i, (current_x,) in enumerate(test_data):
        current_x = current_x.to(device)
        current_x = current_x.unsqueeze(1)  # 增加 seq_len 维度

        # 初始化目标序列 tgt
        if i == 0:
            tgt = torch.zeros(current_x.size(0), 1, current_x.size(2)).to(device)
        else:
            tgt = predicted_y.unsqueeze(1)  # 作为下一步的目标序列

        # 执行预测
        predicted_y = model(current_x, tgt)
        print(current_x)
        print(tgt)


        # 提取最后一个时间步的预测值
        predicted_y = predicted_y[:, -1, :]  # (batch_size, output_dim)

        # 保存预测结果
        predicted.append(predicted_y.cpu().numpy())

# 转换为 NumPy 数组
predicted = np.array(predicted).squeeze()



**真心求助,为什么预测时输出的结果的相同**