机器学习实战项目指南

目标: 通过实战项目将理论转化为技能 原则: 从简单到复杂,从复现到创新


项目分级体系

等级说明

等级特点预计时间示例
⭐ 入门跟随教程完成1-3 天MNIST 分类
⭐⭐ 基础独立完成标准任务3-7 天图像分类、情感分析
⭐⭐⭐ 进阶多模块集成1-2 周推荐系统、问答机器人
⭐⭐⭐⭐ 高级端到端系统/论文复现2-4 周Transformer 复现、RAG
⭐⭐⭐⭐⭐ 专业原创研究/开源贡献1 月+发表论文、核心贡献

入门项目 ⭐

项目 1:手写数字识别 (MNIST)

目标: 理解完整的机器学习流程

技术栈: PyTorch, Matplotlib

步骤:

  1. 加载 MNIST 数据集
  2. 构建简单 MLP(2-3 层)
  3. 训练并可视化 loss 曲线
  4. 评估准确率
  5. 可视化预测结果

代码模板:

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
 
# 数据加载
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])
 
train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST('./data', train=False, transform=transform)
 
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1000)
 
# 模型
class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.layers = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, 10)
        )
 
    def forward(self, x):
        x = self.flatten(x)
        return self.layers(x)
 
# 训练
model = MLP()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
 
for epoch in range(10):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
 
    # 测试
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            pred = model(data).argmax(dim=1)
            correct += pred.eq(target).sum().item()
 
    print(f'Epoch {epoch}: Accuracy {100.*correct/len(test_dataset):.2f}%')

预期结果: 准确率 97%+

扩展练习:

  • 换成 CNN 能达到多少?(预期 99%+)
  • 可视化卷积核学到的特征

项目 2:房价预测

目标: 理解回归任务与特征工程

数据集: Boston Housing / California Housing

技术栈: Scikit-learn, Pandas

步骤:

  1. 探索性数据分析(EDA)
  2. 特征工程(缺失值、标准化、编码)
  3. 模型对比(线性回归、随机森林、XGBoost)
  4. 超参数调优
  5. 模型评估(MSE, R²)

代码模板:

import pandas as pd
import numpy as np
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.metrics import mean_squared_error, r2_score
 
# 加载数据
housing = fetch_california_housing()
X, y = housing.data, housing.target
feature_names = housing.feature_names
 
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
 
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
 
# 模型对比
models = {
    'Linear Regression': LinearRegression(),
    'Ridge': Ridge(alpha=1.0),
    'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
    'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=42)
}
 
for name, model in models.items():
    model.fit(X_train_scaled, y_train)
    y_pred = model.predict(X_test_scaled)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    r2 = r2_score(y_test, y_pred)
    print(f'{name}: RMSE={rmse:.4f}, R²={r2:.4f}')

扩展练习:

  • 使用 XGBoost/LightGBM 对比
  • 进行特征重要性分析
  • 尝试不同的特征工程方法

基础项目 ⭐⭐

项目 3:图像分类 (CIFAR-10)

目标: 掌握 CNN 架构设计与训练技巧

数据集: CIFAR-10(10 类,60000 张 32×32 图片)

技术栈: PyTorch, torchvision

里程碑:

模型预期准确率技术要点
简单 CNN~70%基础卷积+池化
VGG-style~80%小卷积核堆叠
ResNet-18~90%残差连接
数据增强后~92%Cutout, Mixup

关键代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
 
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
 
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 1, stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
 
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        return F.relu(out)
 
class ResNet(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 64, 3, 1, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
 
        self.layer1 = self._make_layer(64, 64, 2, stride=1)
        self.layer2 = self._make_layer(64, 128, 2, stride=2)
        self.layer3 = self._make_layer(128, 256, 2, stride=2)
        self.layer4 = self._make_layer(256, 512, 2, stride=2)
 
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)
 
    def _make_layer(self, in_channels, out_channels, num_blocks, stride):
        layers = [ResidualBlock(in_channels, out_channels, stride)]
        for _ in range(1, num_blocks):
            layers.append(ResidualBlock(out_channels, out_channels))
        return nn.Sequential(*layers)
 
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.avgpool(out)
        out = out.view(out.size(0), -1)
        return self.fc(out)

训练技巧:

# 数据增强
train_transform = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
 
# 学习率调度
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)
 
# 混合精度训练
from torch.cuda.amp import autocast, GradScaler
scaler = GradScaler()
 
with autocast():
    outputs = model(inputs)
    loss = criterion(outputs, labels)
 
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()

项目 4:文本情感分析

目标: 理解 NLP 流程与序列模型

数据集: IMDB 电影评论(50000 条)

技术路线:

方法预期准确率学习要点
TF-IDF + LR~88%传统 NLP
LSTM~87%序列建模
Transformer~89%注意力机制
Fine-tuned BERT~94%预训练模型微调

LSTM 实现:

import torch
import torch.nn as nn
 
class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers=2,
                            batch_first=True, bidirectional=True, dropout=0.5)
        self.fc = nn.Linear(hidden_dim * 2, num_classes)
        self.dropout = nn.Dropout(0.5)
 
    def forward(self, x):
        # x: [batch, seq_len]
        embedded = self.dropout(self.embedding(x))  # [batch, seq_len, embed_dim]
        _, (hidden, _) = self.lstm(embedded)
        # hidden: [num_layers*2, batch, hidden_dim]
        hidden = torch.cat([hidden[-2], hidden[-1]], dim=1)  # 拼接双向
        return self.fc(self.dropout(hidden))

BERT 微调:

from transformers import BertTokenizer, BertForSequenceClassification
from transformers import Trainer, TrainingArguments
 
# 加载预训练模型
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)
 
# 数据处理
def tokenize_function(examples):
    return tokenizer(examples['text'], padding='max_length', truncation=True, max_length=256)
 
# 训练
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=3,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=64,
    learning_rate=2e-5,
    weight_decay=0.01,
)
 
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
)
 
trainer.train()

进阶项目 ⭐⭐⭐

项目 5:图像目标检测

目标: 实现实时目标检测系统

数据集: COCO / Pascal VOC

技术路线:

  1. 使用预训练 YOLOv8
  2. 在自定义数据集上微调
  3. 部署为实时检测应用

使用 Ultralytics YOLOv8:

from ultralytics import YOLO
 
# 加载预训练模型
model = YOLO('yolov8n.pt')
 
# 训练
results = model.train(
    data='coco128.yaml',
    epochs=100,
    imgsz=640,
    batch=16,
)
 
# 推理
results = model('image.jpg')
results[0].show()  # 显示结果
 
# 导出
model.export(format='onnx')  # 导出为 ONNX

自定义数据集结构:

dataset/
├── images/
│   ├── train/
│   └── val/
├── labels/
│   ├── train/
│   └── val/
└── data.yaml

data.yaml 示例:

train: images/train
val: images/val
nc: 3  # 类别数
names: ['person', 'car', 'dog']

项目 6:智能问答机器人

目标: 构建基于 RAG 的知识问答系统

技术栈: LangChain, OpenAI/Ollama, ChromaDB

架构:

用户问题
    ↓
[向量检索] ←── 知识库(文档向量化)
    ↓
[构建 Prompt](问题 + 检索结果)
    ↓
[LLM 生成回答]
    ↓
返回答案

实现代码:

from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
 
# 1. 加载文档
loader = DirectoryLoader('./docs', glob='**/*.md')
documents = loader.load()
 
# 2. 文本分块
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(documents)
 
# 3. 向量化存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(splits, embeddings, persist_directory='./chroma_db')
 
# 4. 构建 QA Chain
llm = ChatOpenAI(model_name='gpt-4', temperature=0)
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type='stuff',
    retriever=vectorstore.as_retriever(search_kwargs={'k': 3}),
    return_source_documents=True,
)
 
# 5. 问答
result = qa_chain({'query': '什么是 Transformer?'})
print(result['result'])
print('Sources:', [doc.metadata for doc in result['source_documents']])

本地部署(使用 Ollama):

from langchain.llms import Ollama
 
# 使用本地 LLM
llm = Ollama(model='llama2')
 
# 使用本地 Embedding
from langchain.embeddings import HuggingFaceEmbeddings
embeddings = HuggingFaceEmbeddings(model_name='sentence-transformers/all-MiniLM-L6-v2')

项目 7:推荐系统

目标: 构建协同过滤 + 深度学习推荐系统

数据集: MovieLens / Amazon Reviews

技术路线:

方法特点适用场景
基于用户的协同过滤相似用户推荐用户行为丰富
基于物品的协同过滤相似物品推荐物品信息丰富
矩阵分解 (SVD)隐因子模型大规模数据
NCF (神经协同过滤)深度学习复杂交互
双塔模型召回阶段工业级系统

NCF 实现:

import torch
import torch.nn as nn
 
class NCF(nn.Module):
    def __init__(self, num_users, num_items, embed_dim=64, hidden_dims=[128, 64, 32]):
        super().__init__()
        # GMF 部分
        self.user_embed_gmf = nn.Embedding(num_users, embed_dim)
        self.item_embed_gmf = nn.Embedding(num_items, embed_dim)
 
        # MLP 部分
        self.user_embed_mlp = nn.Embedding(num_users, embed_dim)
        self.item_embed_mlp = nn.Embedding(num_items, embed_dim)
 
        mlp_layers = []
        input_dim = embed_dim * 2
        for hidden_dim in hidden_dims:
            mlp_layers.append(nn.Linear(input_dim, hidden_dim))
            mlp_layers.append(nn.ReLU())
            mlp_layers.append(nn.Dropout(0.2))
            input_dim = hidden_dim
        self.mlp = nn.Sequential(*mlp_layers)
 
        # 输出层
        self.output = nn.Linear(embed_dim + hidden_dims[-1], 1)
        self.sigmoid = nn.Sigmoid()
 
    def forward(self, user_ids, item_ids):
        # GMF
        user_gmf = self.user_embed_gmf(user_ids)
        item_gmf = self.item_embed_gmf(item_ids)
        gmf_out = user_gmf * item_gmf  # 逐元素乘
 
        # MLP
        user_mlp = self.user_embed_mlp(user_ids)
        item_mlp = self.item_embed_mlp(item_ids)
        mlp_input = torch.cat([user_mlp, item_mlp], dim=1)
        mlp_out = self.mlp(mlp_input)
 
        # 合并
        concat = torch.cat([gmf_out, mlp_out], dim=1)
        return self.sigmoid(self.output(concat)).squeeze()

高级项目 ⭐⭐⭐⭐

项目 8:Transformer 从零复现

目标: 深入理解注意力机制

参考笔记: Transformer

实现清单:

  • 多头自注意力
  • 位置编码(正弦/RoPE)
  • Encoder Block
  • Decoder Block(含因果 mask)
  • 完整 Encoder-Decoder
  • 在机器翻译任务上训练

核心代码(见 Transformer 笔记)

训练数据: WMT14 英德翻译数据集


项目 9:大模型微调 (LoRA)

目标: 在有限资源下微调大模型

技术栈: PEFT, Hugging Face

实现步骤:

from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import get_peft_model, LoraConfig, TaskType
import torch
 
# 加载基座模型
model_name = "meta-llama/Llama-2-7b-hf"
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    load_in_8bit=True,  # 8-bit 量化
    device_map="auto",
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
 
# LoRA 配置
lora_config = LoraConfig(
    task_type=TaskType.CAUSAL_LM,
    r=8,                    # LoRA rank
    lora_alpha=32,          # 缩放因子
    lora_dropout=0.1,
    target_modules=["q_proj", "v_proj"],  # 只训练 Q 和 V 投影
)
 
# 应用 LoRA
model = get_peft_model(model, lora_config)
model.print_trainable_parameters()
# 输出: trainable params: 4,194,304 || all params: 6,742,609,920 || trainable%: 0.06%
 
# 训练(使用 SFT Trainer)
from trl import SFTTrainer
 
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    dataset_text_field="text",
    max_seq_length=512,
    tokenizer=tokenizer,
    args=training_args,
)
 
trainer.train()

显存估算:

模型规模全量微调QLoRA (4-bit)
7B28 GB6 GB
13B52 GB10 GB
70B280 GB35 GB

项目 10:扩散模型图像生成

目标: 理解生成模型原理

技术栈: Diffusers, PyTorch

使用 Stable Diffusion:

from diffusers import StableDiffusionPipeline
import torch
 
# 加载模型
pipe = StableDiffusionPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    torch_dtype=torch.float16,
)
pipe = pipe.to("cuda")
 
# 生成图像
prompt = "a photo of a cat sitting on a rainbow, digital art"
image = pipe(prompt).images[0]
image.save("cat_rainbow.png")

训练自己的扩散模型:

from diffusers import DDPMPipeline, DDPMScheduler
from diffusers import UNet2DModel
from accelerate import Accelerator
 
# 配置模型
model = UNet2DModel(
    sample_size=64,
    in_channels=3,
    out_channels=3,
    layers_per_block=2,
    block_out_channels=(128, 256, 256, 512),
    down_block_types=(
        "DownBlock2D", "DownBlock2D", "AttnDownBlock2D", "DownBlock2D",
    ),
    up_block_types=(
        "UpBlock2D", "AttnUpBlock2D", "UpBlock2D", "UpBlock2D",
    ),
)
 
noise_scheduler = DDPMScheduler(num_train_timesteps=1000)
 
# 训练循环
for epoch in range(num_epochs):
    for batch in dataloader:
        images = batch['images']
 
        # 添加噪声
        noise = torch.randn_like(images)
        timesteps = torch.randint(0, 1000, (images.shape[0],))
        noisy_images = noise_scheduler.add_noise(images, noise, timesteps)
 
        # 预测噪声
        noise_pred = model(noisy_images, timesteps).sample
 
        # 计算损失
        loss = F.mse_loss(noise_pred, noise)
        loss.backward()
        optimizer.step()

竞赛实战

Kaggle 入门竞赛

竞赛任务类型难度推荐原因
Titanic分类完整 ML 流程入门
House Prices回归特征工程实践
Digit Recognizer分类CNN 入门
Dogs vs Cats图像分类⭐⭐迁移学习实践
NLP with Disaster TweetsNLP⭐⭐NLP 入门

Kaggle 进阶竞赛

竞赛类型技术要点推荐程度
表格数据竞赛XGBoost/LightGBM, 特征工程⭐⭐⭐⭐⭐
图像分类竞赛EfficientNet, 数据增强⭐⭐⭐⭐
NLP 竞赛BERT 微调, 模型集成⭐⭐⭐⭐
时间序列竞赛LSTM, Prophet, 特征工程⭐⭐⭐

竞赛技巧

提分策略:

  1. EDA 做透: 理解数据分布
  2. 特征工程: 表格竞赛的关键
  3. 交叉验证: 正确评估模型
  4. 模型集成: Stacking, Blending
  5. 后处理: 阈值优化、规则修正

代码模板:

# K-Fold 交叉验证
from sklearn.model_selection import StratifiedKFold
import numpy as np
 
def train_with_kfold(X, y, model_fn, n_splits=5):
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    oof_preds = np.zeros(len(X))
 
    for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
        X_train, X_val = X[train_idx], X[val_idx]
        y_train, y_val = y[train_idx], y[val_idx]
 
        model = model_fn()
        model.fit(X_train, y_train)
 
        oof_preds[val_idx] = model.predict_proba(X_val)[:, 1]
        print(f'Fold {fold}: AUC = {roc_auc_score(y_val, oof_preds[val_idx]):.4f}')
 
    print(f'Overall: AUC = {roc_auc_score(y, oof_preds):.4f}')
    return oof_preds

项目管理最佳实践

项目结构模板

project/
├── data/
│   ├── raw/                # 原始数据
│   ├── processed/          # 处理后数据
│   └── external/           # 外部数据
├── notebooks/
│   ├── 01_eda.ipynb        # 探索性分析
│   ├── 02_baseline.ipynb   # 基线模型
│   └── 03_experiments.ipynb # 实验
├── src/
│   ├── data/               # 数据处理
│   ├── models/             # 模型定义
│   ├── training/           # 训练逻辑
│   └── utils/              # 工具函数
├── configs/                # 配置文件
├── outputs/                # 输出(模型、日志)
├── requirements.txt        # 依赖
└── README.md               # 项目说明

实验管理

使用 Weights & Biases:

import wandb
 
# 初始化
wandb.init(project='my-project', config={
    'learning_rate': 1e-3,
    'batch_size': 32,
    'epochs': 100,
})
 
# 记录指标
for epoch in range(epochs):
    train_loss = train()
    val_loss, val_acc = validate()
 
    wandb.log({
        'epoch': epoch,
        'train_loss': train_loss,
        'val_loss': val_loss,
        'val_acc': val_acc,
    })
 
# 保存模型
wandb.save('model.pth')

相关笔记


最后更新: 2026-01-04 维护者: sean2077