Initial commit

This commit is contained in:
2026-03-26 18:49:14 +08:00
commit ec8bffe26e
6 changed files with 1723 additions and 0 deletions

7
.env.example Normal file
View File

@@ -0,0 +1,7 @@
# B站请求头配置
# 直接复制浏览器请求头里的整段 Cookie 原文,不要手工拼接
# 推荐至少包含 SESSDATA、bili_jct、DedeUserID、buvid3、buvid4
BILI_COOKIE=SESSDATA=你的SESSDATA; bili_jct=你的bili_jct; DedeUserID=你的DedeUserID; DedeUserID__ckMd5=你的ckMd5; buvid3=你的buvid3; buvid4=你的buvid4;
# 建议与浏览器保持一致,可从同一请求头复制 User-Agent
BILI_USER_AGENT=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36

9
.gitignore vendored Normal file
View File

@@ -0,0 +1,9 @@
venv/
__pycache__/
*.pyc
.env
outputs/
data/*.csv
.vscode/
.idea/
.DS_Store

135
README.md Normal file
View File

@@ -0,0 +1,135 @@
# 比亚迪汽车视频评论情感分析项目
## 1. 项目说明
本项目按实验流程完成以下任务:
- 评论清洗、去重、分词与停用词过滤
- SnowNLP情感标注与可视化分析
- 两种TF-IDF方案+朴素贝叶斯建模对比
- 加入时间与点赞特征后再次训练评估
- src/byd_sentiment_pipeline.py: 主流程脚本
- cn_stopwords.txt: 停用词表
- .env.example: 环境变量模板
- data/: 中间数据与结果CSV
- outputs/: 图像与分析报告
## 2. 环境准备
1. 创建并激活虚拟环境
2. 安装依赖
```bash
pip install -r requirements.txt
```
## 3. Cookie配置
复制模板生成.env文件
```bash
cp .env.example .env
```
在.env中填写真实Cookie。Cookie不会写进代码。
推荐按下面步骤获取,成功率更高:
1. 浏览器登录B站账号打开任意一个目标视频页面并停留10秒。
2. 按F12打开开发者工具切到Network。
3. 刷新页面后在请求列表中选中任意一个发往api.bilibili.com的请求常见如x/v2/reply、x/web-interface/view
4. 在Headers里复制两项
- Request Headers -> Cookie 整段值粘贴到BILI_COOKIE。
- Request Headers -> User-Agent粘贴到BILI_USER_AGENT。
5. 确保Cookie是一整行不要换行不要多余引号。
示例(仅演示格式):
```env
BILI_COOKIE=SESSDATA=xxxx; bili_jct=xxxx; DedeUserID=xxxx; DedeUserID__ckMd5=xxxx; buvid3=xxxx; buvid4=xxxx;
BILI_USER_AGENT=Mozilla/5.0 (...) Chrome/124.0.0.0 Safari/537.36
```
先做自检再爬取:
```bash
python - <<'PY'
import os
from dotenv import load_dotenv
load_dotenv('.env')
cookie = os.getenv('BILI_COOKIE', '')
ua = os.getenv('BILI_USER_AGENT', '')
print('Cookie已配置:', bool(cookie), '长度:', len(cookie))
print('UA已配置:', bool(ua), '长度:', len(ua))
PY
```
如果出现412或-352
1. 重新复制一次最新Cookie过期很常见
2. 降低抓取频率增大sleep参数
3. 避免同一时间多终端并发跑爬虫。
## 5. 运行方式
全流程执行:
```bash
python src/byd_sentiment_pipeline.py all --target-comments 4500 --min-comments 4000
```
分阶段执行:
```bash
python src/byd_sentiment_pipeline.py crawl --target-comments 4500 --min-comments 4000
python src/byd_sentiment_pipeline.py preprocess
python src/byd_sentiment_pipeline.py explore
python src/byd_sentiment_pipeline.py model
```
说明:
1. crawl阶段默认支持断点续抓。若data/byd_comments_raw.csv已存在会自动读取并按评论ID/评论内容+时间+BV号去重后继续抓取。
2. 因为B站接口可能间歇返回412建议多轮执行crawl累积到目标条数不会重复叠加历史评论。
爬虫建议使用更慢参数减少412概率
```bash
python src/byd_sentiment_pipeline.py crawl --target-comments 4500 --min-comments 4000 --sleep-min 1.2 --sleep-max 2.5
```
如果想自动多轮累积每轮都会基于历史CSV去重后续抓可使用
```bash
python src/byd_sentiment_pipeline.py crawl --target-comments 4500 --min-comments 4000 --sleep-min 1.2 --sleep-max 2.5 --rounds 6 --round-cooldown 90
```
## 5.1 B站场景停用词补充
如果词云或top_words中出现“回复、展开、置顶”等平台噪声词可直接在cn_stopwords.txt中每行追加一个词然后重新执行
```bash
python src/byd_sentiment_pipeline.py preprocess
python src/byd_sentiment_pipeline.py explore
```
## 6. 输出文件
- data/byd_comments_raw.csv
- data/byd_comments_preprocessed.csv
- data/byd_comments_labeled.csv
- outputs/overall_wordcloud.png
- outputs/负面\_wordcloud.png
- outputs/中性\_wordcloud.png
- outputs/正面\_wordcloud.png
- outputs/sentiment_pie.png
- outputs/monthly_trend.png
- outputs/top10_likes_bar.png
- outputs/confusion_method1_default.png
- outputs/confusion_method2_improved.png
- outputs/confusion_method2_plus_features.png
- outputs/model_metrics_summary.csv
- outputs/exploration_report.md
- outputs/model_report.md

761
cn_stopwords.txt Normal file
View File

@@ -0,0 +1,761 @@
$
0
1
2
3
4
5
6
7
8
9
?
_
一些
一何
一切
一则
一方面
一旦
一来
一样
一般
一转眼
万一
上下
不仅
不但
不光
不单
不只
不外乎
不如
不妨
不尽
不尽然
不得
不怕
不惟
不成
不拘
不料
不是
不比
不然
不特
不独
不管
不至于
不若
不论
不过
不问
与其
与其说
与否
与此同时
且不说
且说
两者
个别
为了
为什么
为何
为止
为此
为着
乃至
乃至于
之一
之所以
之类
乌乎
也好
也罢
二来
于是
于是乎
云云
云尔
人们
人家
什么
什么样
介于
仍旧
从此
从而
他人
他们
以上
以为
以便
以免
以及
以故
以期
以来
以至
以至于
以致
任何
任凭
似的
但凡
但是
何以
何况
何处
何时
余外
作为
你们
使
使得
例如
依据
依照
便于
俺们
倘使
倘或
倘然
倘若
假使
假如
假若
傥然
先不先
光是
全体
全部
关于
其一
其中
其二
其他
其余
其它
其次
具体地说
具体说来
兼之
再其次
再则
再有
再者
再者说
再说
况且
几时
凡是
凭借
出于
出来
分别
则甚
别人
别处
别是
别的
别管
别说
前后
前此
前者
加之
加以
即令
即使
即便
即如
即或
即若
又及
及其
及至
反之
反而
反过来
反过来说
受到
另一方面
另外
另悉
只当
只怕
只是
只有
只消
只要
只限
叮咚
可以
可是
可见
各个
各位
各种
各自
同时
后者
向使
向着
否则
吧哒
呜呼
呵呵
呼哧
咱们
哈哈
哎呀
哎哟
哪个
哪些
哪儿
哪天
哪年
哪怕
哪样
哪边
哪里
哼唷
唯有
啪达
啷当
喔唷
嗡嗡
嘎登
嘿嘿
因为
因了
因此
因着
因而
固然
在下
在于
基于
处在
多么
多少
大家
她们
如上
如上所述
如下
如何
如其
如同
如是
如果
如此
如若
始而
孰料
孰知
宁可
宁愿
宁肯
它们
对于
对待
对方
对比
尔后
尔尔
尚且
就是
就是了
就是说
就算
就要
尽管
尽管如此
岂但
已矣
巴巴
并且
并非
庶乎
庶几
开外
开始
归齐
当地
当然
当着
彼时
彼此
得了
怎么
怎么办
怎么样
怎奈
怎样
总之
总的来看
总的来说
总的说来
总而言之
恰恰相反
惟其
慢说
我们
或则
或是
或曰
或者
截至
所以
所在
所幸
所有
才能
打从
抑或
按照
换句话说
换言之
据此
接着
故此
故而
旁人
无宁
无论
既往
既是
既然
时候
是以
是的
替代
有些
有关
有及
有时
有的
朝着
本人
本地
本着
本身
来着
来自
来说
极了
果然
果真
某个
某些
某某
根据
正值
正如
正巧
正是
此地
此处
此外
此时
此次
此间
毋宁
每当
比及
比如
比方
没奈何
沿
沿着
漫说
然则
然后
然而
照着
犹且
犹自
甚且
甚么
甚或
甚而
甚至
甚至于
用来
由于
由是
由此
由此可见
的确
的话
直到
相对而言
省得
眨眼
着呢
矣乎
矣哉
竟而
等到
等等
简言之
类如
紧接着
纵令
纵使
纵然
经过
结果
继之
继后
继而
综上所述
罢了
而且
而况
而后
而外
而已
而是
而言
能否
自个儿
自从
自各儿
自后
自家
自己
自打
自身
至于
至今
至若
般的
若夫
若是
若果
若非
莫不然
莫如
莫若
虽则
虽然
虽说
要不
要不是
要不然
要么
要是
譬喻
譬如
许多
设使
设或
设若
诚如
诚然
说来
诸位
诸如
谁人
谁料
谁知
贼死
赖以
起见
趁着
越是
较之
还是
还有
还要
这一来
这个
这么
这么些
这么样
这么点儿
这些
这会儿
这儿
这就是说
这时
这样
这次
这般
这边
这里
进而
连同
逐步
通过
遵循
遵照
那个
那么
那么些
那么样
那些
那会儿
那儿
那时
那样
那般
那边
那里
鄙人
鉴于
针对
除了
除外
除开
除此之外
除非
随后
随时
随着
难道说
非但
非徒
非特
非独
顺着
首先
回复
展开
收起
置顶
原帖
楼主
层主
博主
评论区
弹幕
转发
点赞
链接
网页链接
视频链接

11
requirements.txt Normal file
View File

@@ -0,0 +1,11 @@
pandas>=2.1
numpy>=1.26
requests>=2.31
python-dotenv>=1.0
jieba>=0.42.1
snownlp>=0.12.3
wordcloud>=1.9.3
matplotlib>=3.8
scikit-learn>=1.4
scipy>=1.12
tqdm>=4.66

View File

@@ -0,0 +1,800 @@
import argparse
import math
import os
import random
import re
import time
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import Dict, Iterable, List, Set, Tuple
import jieba
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import requests
from dotenv import load_dotenv
from matplotlib import font_manager
from scipy.sparse import csr_matrix, hstack
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
from sklearn.metrics import (
accuracy_score,
classification_report,
confusion_matrix,
precision_recall_fscore_support,
)
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import MultinomialNB
from sklearn.preprocessing import MinMaxScaler
from snownlp import SnowNLP
from tqdm import tqdm
from wordcloud import WordCloud
RAW_COMMENT_COLUMNS = ["评论内容", "评论时间", "点赞数", "BV号", "AV号", "评论ID", "是否子评论"]
def get_project_root() -> Path:
return Path(__file__).resolve().parents[1]
def ensure_dirs(project_root: Path) -> Dict[str, Path]:
data_dir = project_root / "data"
output_dir = project_root / "outputs"
data_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)
return {
"data": data_dir,
"outputs": output_dir,
"raw_csv": data_dir / "byd_comments_raw.csv",
"preprocessed_csv": data_dir / "byd_comments_preprocessed.csv",
"labeled_csv": data_dir / "byd_comments_labeled.csv",
}
def detect_chinese_font() -> Tuple[str, str]:
candidate_paths = [
"/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc",
"/usr/share/fonts/truetype/wqy/wqy-microhei.ttc",
"/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
"/usr/share/fonts/opentype/noto/NotoSerifCJK-Regular.ttc",
"/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
"/usr/share/fonts/truetype/arphic/uming.ttc",
"/System/Library/Fonts/PingFang.ttc",
"C:/Windows/Fonts/msyh.ttc",
]
for font_path in candidate_paths:
p = Path(font_path)
if p.exists():
font_name = font_manager.FontProperties(fname=str(p)).get_name()
return str(p), font_name
preferred_names = [
"Noto Sans CJK SC",
"Noto Serif CJK SC",
"WenQuanYi Zen Hei",
"SimHei",
"Microsoft YaHei",
]
available = {f.name: f.fname for f in font_manager.fontManager.ttflist}
for font_name in preferred_names:
if font_name in available:
return available[font_name], font_name
raise RuntimeError("未检测到可用中文字体。请安装Noto Sans CJK或文泉驿字体后重试。")
def setup_matplotlib(font_name: str) -> None:
plt.rcParams["font.sans-serif"] = [
font_name,
"Noto Sans CJK SC",
"WenQuanYi Zen Hei",
"SimHei",
]
plt.rcParams["axes.unicode_minus"] = False
class BilibiliCrawler:
def __init__(self, cookie: str = "", user_agent: str = "") -> None:
self.session = requests.Session()
headers = {
"User-Agent": user_agent
or "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Referer": "https://www.bilibili.com/",
"Origin": "https://www.bilibili.com",
"Accept": "application/json, text/plain, */*",
}
if cookie:
headers["Cookie"] = cookie
self.session.headers.update(headers)
def _request_json(self, url: str, params: Dict, retries: int = 5) -> Dict:
for idx in range(retries):
try:
resp = self.session.get(url, params=params, timeout=15)
resp.raise_for_status()
data = resp.json()
code = data.get("code", 0)
if code == 0:
return data
if code == -352:
raise RuntimeError(
"触发风控(-352)。请在.env中补充有效Cookie并降低抓取速度。"
)
if code in {-412, -509}:
raise RuntimeError(f"接口返回风控码: {code}")
if code != 0:
raise RuntimeError(f"接口返回异常: code={code}, message={data.get('message')}")
except Exception:
if idx == retries - 1:
raise
sleep_sec = 1.0 + idx * 1.2 + random.random()
time.sleep(sleep_sec)
raise RuntimeError("请求失败")
def get_aid(self, bvid: str) -> int:
url = "https://api.bilibili.com/x/web-interface/view"
data = self._request_json(url, {"bvid": bvid})
return int(data["data"]["aid"])
def get_comment_page(self, aid: int, pn: int, ps: int = 20, sort: int = 2) -> Dict:
url = "https://api.bilibili.com/x/v2/reply"
params = {
"type": 1,
"oid": aid,
"pn": pn,
"ps": ps,
"sort": sort,
}
return self._request_json(url, params)
def parse_reply(reply: Dict, bvid: str, aid: int, is_sub_reply: bool) -> Dict:
ctime = reply.get("ctime")
time_str = ""
if ctime:
time_str = datetime.fromtimestamp(int(ctime)).strftime("%Y-%m-%d %H:%M:%S")
return {
"评论内容": (reply.get("content") or {}).get("message", "").strip(),
"评论时间": time_str,
"点赞数": int(reply.get("like") or 0),
"BV号": bvid,
"AV号": aid,
"评论ID": str(reply.get("rpid_str") or reply.get("rpid") or ""),
"是否子评论": int(is_sub_reply),
}
def iter_page_comments(replies: List[Dict], bvid: str, aid: int) -> Iterable[Dict]:
for reply in replies or []:
yield parse_reply(reply, bvid, aid, is_sub_reply=False)
for sub in (reply.get("replies") or []):
yield parse_reply(sub, bvid, aid, is_sub_reply=True)
def normalize_raw_comments(df: pd.DataFrame) -> pd.DataFrame:
if df is None or df.empty:
return pd.DataFrame(columns=RAW_COMMENT_COLUMNS)
for col in RAW_COMMENT_COLUMNS:
if col not in df.columns:
df[col] = ""
out = df[RAW_COMMENT_COLUMNS].copy()
out["评论内容"] = out["评论内容"].fillna("").astype(str).str.strip()
out["评论时间"] = out["评论时间"].fillna("").astype(str).str.strip()
out["BV号"] = out["BV号"].fillna("").astype(str).str.strip()
out["评论ID"] = out["评论ID"].fillna("").astype(str).str.strip()
out["点赞数"] = pd.to_numeric(out["点赞数"], errors="coerce").fillna(0).astype(int)
out["AV号"] = pd.to_numeric(out["AV号"], errors="coerce").fillna(0).astype(int)
out["是否子评论"] = pd.to_numeric(out["是否子评论"], errors="coerce").fillna(0).astype(int)
out = out[out["评论内容"].str.len() > 0].copy()
return out
def deduplicate_raw_comments(df: pd.DataFrame) -> pd.DataFrame:
df = normalize_raw_comments(df)
if df.empty:
return df
has_id = df["评论ID"].str.len() > 0
part_with_id = df[has_id].drop_duplicates(subset=["评论ID"], keep="first")
part_without_id = df[~has_id].drop_duplicates(
subset=["评论内容", "评论时间", "BV号"], keep="first"
)
merged = pd.concat([part_with_id, part_without_id], ignore_index=True)
merged = merged.drop_duplicates(
subset=["评论ID", "评论内容", "评论时间", "BV号"], keep="first"
)
return merged[RAW_COMMENT_COLUMNS]
def load_existing_raw_comments(out_csv: Path) -> pd.DataFrame:
if not out_csv.exists():
return pd.DataFrame(columns=RAW_COMMENT_COLUMNS)
try:
old = pd.read_csv(out_csv)
except Exception as exc:
print(f"读取已有评论文件失败,忽略历史文件并重抓: {exc}")
return pd.DataFrame(columns=RAW_COMMENT_COLUMNS)
old = deduplicate_raw_comments(old)
old.to_csv(out_csv, index=False, encoding="utf-8-sig")
return old
def build_seen_sets(df: pd.DataFrame) -> Tuple[Set[str], Set[Tuple[str, str, str]]]:
if df.empty:
return set(), set()
ids = {x for x in df["评论ID"].astype(str).tolist() if x}
keys = {
(str(r["评论内容"]), str(r["评论时间"]), str(r["BV号"]))
for _, r in df[["评论内容", "评论时间", "BV号"]].iterrows()
}
return ids, keys
def crawl_comments(
bvids: List[str],
out_csv: Path,
target_comments: int,
min_comments: int,
sleep_min: float,
sleep_max: float,
rounds: int,
round_cooldown: float,
) -> pd.DataFrame:
load_dotenv(get_project_root() / ".env")
cookie = os.getenv("BILI_COOKIE", "")
user_agent = os.getenv("BILI_USER_AGENT", "")
crawler = BilibiliCrawler(cookie=cookie, user_agent=user_agent)
existing_df = load_existing_raw_comments(out_csv)
rows: List[Dict] = existing_df.to_dict("records")
seen_ids, seen_keys = build_seen_sets(existing_df)
newly_added = 0
if len(rows) > 0:
print(f"检测到已有评论 {len(rows)} 条,启用断点续抓与去重累积。")
if len(rows) >= target_comments:
print(f"已有评论已达到目标条数({target_comments}),跳过抓取。")
return existing_df
total_rounds = max(1, int(rounds))
for round_idx in range(1, total_rounds + 1):
print(f"开始抓取第{round_idx}/{total_rounds}轮,目标评论数: {target_comments}")
for bvid in bvids:
try:
aid = crawler.get_aid(bvid)
first_page = crawler.get_comment_page(aid=aid, pn=1, ps=20, sort=2)
total_count = int((first_page.get("data") or {}).get("page", {}).get("count", 0))
total_pages = max(1, math.ceil(total_count / 20))
except Exception as exc:
print(f"跳过视频 {bvid}: 获取首屏失败 -> {exc}")
continue
print(f"视频 {bvid} (aid={aid}) 评论总量约: {total_count},分页: {total_pages}")
page_range = range(1, total_pages + 1)
for pn in tqdm(page_range, desc=f"{round_idx}轮抓取 {bvid}", ncols=88):
if pn == 1:
page_data = first_page
else:
try:
page_data = crawler.get_comment_page(aid=aid, pn=pn, ps=20, sort=2)
except Exception as exc:
print(f"视频 {bvid}{pn}页失败,结束该视频继续下一个: {exc}")
break
replies = (page_data.get("data") or {}).get("replies") or []
if not replies:
continue
for item in iter_page_comments(replies, bvid=bvid, aid=aid):
if not item["评论内容"]:
continue
comment_id = str(item.get("评论ID") or "").strip()
comment_key = (
str(item.get("评论内容") or "").strip(),
str(item.get("评论时间") or "").strip(),
str(item.get("BV号") or "").strip(),
)
if comment_id and comment_id in seen_ids:
continue
if comment_key in seen_keys:
continue
if comment_id:
seen_ids.add(comment_id)
seen_keys.add(comment_key)
rows.append(item)
newly_added += 1
if len(rows) >= target_comments:
break
if len(rows) >= target_comments:
break
time.sleep(random.uniform(sleep_min, sleep_max))
if newly_added >= 200:
deduplicate_raw_comments(pd.DataFrame(rows)).to_csv(
out_csv, index=False, encoding="utf-8-sig"
)
newly_added = 0
if rows:
deduplicate_raw_comments(pd.DataFrame(rows)).to_csv(
out_csv, index=False, encoding="utf-8-sig"
)
if len(rows) >= target_comments:
break
if len(rows) >= target_comments:
break
if round_idx < total_rounds:
cooldown = max(0.0, float(round_cooldown))
print(f"{round_idx}轮结束,当前{len(rows)}条,等待{cooldown:.0f}秒后继续下一轮。")
time.sleep(cooldown)
df = deduplicate_raw_comments(pd.DataFrame(rows))
df.to_csv(out_csv, index=False, encoding="utf-8-sig")
print(f"抓取完成,实际评论数: {len(df)},文件: {out_csv}")
if len(df) < min_comments:
print(
f"警告: 当前样本少于{min_comments}条。可补充Cookie、放慢速率、或提高target_comments再次抓取。"
)
return df
def load_stopwords(stopwords_path: Path) -> set:
if not stopwords_path.exists():
raise FileNotFoundError(f"停用词文件不存在: {stopwords_path}")
with stopwords_path.open("r", encoding="utf-8") as f:
return {line.strip() for line in f if line.strip()}
def clean_comment_text(text: str) -> str:
text = str(text or "")
text = re.sub(r"\s+", "", text)
text = re.sub(r"[A-Za-z]", "", text)
text = re.sub(r"[^\u4e00-\u9fff0-9]", "", text)
return text
def tokenize_text(text: str, stopwords: set) -> List[str]:
tokens = jieba.lcut(text, cut_all=False)
return [t.strip() for t in tokens if t.strip() and t not in stopwords]
def preprocess_comments(raw_csv: Path, out_csv: Path, stopwords_path: Path) -> pd.DataFrame:
if not raw_csv.exists():
raise FileNotFoundError(f"原始评论文件不存在: {raw_csv}")
stopwords = load_stopwords(stopwords_path)
df = pd.read_csv(raw_csv)
before = len(df)
df = df.drop_duplicates(subset=["评论内容"], keep="first")
after_dedup = len(df)
df["清洗文本"] = df["评论内容"].astype(str).map(clean_comment_text)
df = df[df["清洗文本"].str.len() > 0].copy()
df["词列表"] = df["清洗文本"].map(lambda x: tokenize_text(x, stopwords))
df["分词文本"] = df["词列表"].map(lambda x: " ".join(x))
df = df[df["分词文本"].str.len() > 0].copy()
df.to_csv(out_csv, index=False, encoding="utf-8-sig")
print(
f"预处理完成: 原始{before}条 -> 去重后{after_dedup}条 -> 有效分词{len(df)}条,文件: {out_csv}"
)
return df
def sentiment_label(score: float, neg_threshold: float = 0.4, pos_threshold: float = 0.6) -> int:
if score <= neg_threshold:
return -1
if score >= pos_threshold:
return 1
return 0
def build_freq(tokens_series: pd.Series, topn: int = 1000) -> List[Tuple[str, int]]:
counter = Counter()
for text in tokens_series.fillna(""):
counter.update([w for w in str(text).split() if w])
return counter.most_common(topn)
def save_wordcloud(freq_items: List[Tuple[str, int]], title: str, out_path: Path, font_path: str) -> None:
if not freq_items:
print(f"跳过词云: {title}(无词频)")
return
wc = WordCloud(
font_path=font_path,
width=1400,
height=900,
background_color="white",
max_words=1000,
colormap="viridis",
)
wc.generate_from_frequencies(dict(freq_items))
plt.figure(figsize=(10, 6))
plt.imshow(wc, interpolation="bilinear")
plt.axis("off")
plt.title(title)
plt.tight_layout()
plt.savefig(out_path, dpi=300)
plt.close()
def plot_confusion(cm: np.ndarray, labels: List[str], title: str, out_path: Path) -> None:
fig, ax = plt.subplots(figsize=(6, 5))
im = ax.imshow(cm, cmap="Blues")
fig.colorbar(im, ax=ax)
ax.set_xticks(range(len(labels)))
ax.set_yticks(range(len(labels)))
ax.set_xticklabels(labels)
ax.set_yticklabels(labels)
ax.set_xlabel("预测类别")
ax.set_ylabel("真实类别")
ax.set_title(title)
for i in range(cm.shape[0]):
for j in range(cm.shape[1]):
ax.text(j, i, int(cm[i, j]), ha="center", va="center", color="black")
plt.tight_layout()
plt.savefig(out_path, dpi=300)
plt.close()
def run_exploration(preprocessed_csv: Path, labeled_csv: Path, output_dir: Path, font_path: str) -> pd.DataFrame:
if not preprocessed_csv.exists():
raise FileNotFoundError(f"预处理文件不存在: {preprocessed_csv}")
df = pd.read_csv(preprocessed_csv)
df["情感得分"] = df["清洗文本"].astype(str).map(lambda x: float(SnowNLP(x).sentiments))
df["类别"] = df["情感得分"].map(sentiment_label)
df.to_csv(labeled_csv, index=False, encoding="utf-8-sig")
top_words = build_freq(df["分词文本"], topn=1000)
pd.DataFrame(top_words, columns=["词语", "词频"]).to_csv(
output_dir / "top1000_words.csv", index=False, encoding="utf-8-sig"
)
save_wordcloud(top_words, "整体评论词云图", output_dir / "overall_wordcloud.png", font_path)
label_map = {-1: "负面", 0: "中性", 1: "正面"}
for label, name in label_map.items():
sub_freq = build_freq(df.loc[df["类别"] == label, "分词文本"], topn=1000)
pd.DataFrame(sub_freq, columns=["词语", "词频"]).to_csv(
output_dir / f"top_words_{name}.csv", index=False, encoding="utf-8-sig"
)
save_wordcloud(sub_freq, f"{name}评论词云图", output_dir / f"{name}_wordcloud.png", font_path)
sentiment_counts = df["类别"].value_counts().reindex([-1, 0, 1], fill_value=0)
colors = ["#d73027", "#fee08b", "#1a9850"]
plt.figure(figsize=(8, 6))
plt.pie(
sentiment_counts.values,
labels=[f"负面({sentiment_counts[-1]})", f"中性({sentiment_counts[0]})", f"正面({sentiment_counts[1]})"],
autopct="%1.1f%%",
startangle=150,
colors=colors,
)
plt.title("比亚迪视频评论情感类型分布")
plt.tight_layout()
plt.savefig(output_dir / "sentiment_pie.png", dpi=300)
plt.close()
dt_series = pd.to_datetime(df["评论时间"], errors="coerce")
valid = df.loc[dt_series.notna()].copy()
valid["评论时间_dt"] = dt_series[dt_series.notna()]
monthly = (
valid.groupby(valid["评论时间_dt"].dt.to_period("M"))
.size()
.reset_index(name="评论量")
.rename(columns={"评论时间_dt": "月份"})
)
monthly["月份"] = monthly["月份"].astype(str)
if not monthly.empty:
q1, q2 = monthly["评论量"].quantile([0.33, 0.66]).tolist()
def stage(x: int) -> str:
if x <= q1:
return "初始期"
if x >= q2:
return "爆发期"
return "平稳期"
monthly["阶段"] = monthly["评论量"].map(stage)
monthly.to_csv(output_dir / "monthly_trend.csv", index=False, encoding="utf-8-sig")
x = pd.to_datetime(monthly["月份"]) # 月度序列用于画图
plt.figure(figsize=(11, 6))
plt.plot(x, monthly["评论量"], marker="o", linewidth=2, color="#1f77b4")
stage_colors = {"初始期": "#8ecae6", "平稳期": "#90be6d", "爆发期": "#f94144"}
for s_name, group in monthly.groupby("阶段"):
x_g = pd.to_datetime(group["月份"])
plt.scatter(x_g, group["评论量"], s=65, color=stage_colors[s_name], label=s_name)
plt.title("月度评论量趋势与舆情阶段")
plt.xlabel("月份")
plt.ylabel("评论量")
plt.legend()
plt.grid(alpha=0.25)
plt.tight_layout()
plt.savefig(output_dir / "monthly_trend.png", dpi=300)
plt.close()
top10 = df.sort_values("点赞数", ascending=False).head(10).copy()
top10["评论简写"] = top10["评论内容"].astype(str).map(
lambda x: x if len(x) <= 20 else f"{x[:20]}..."
)
top10.to_csv(output_dir / "top10_liked_comments.csv", index=False, encoding="utf-8-sig")
plt.figure(figsize=(11, 6))
y_pos = np.arange(len(top10))
plt.barh(y_pos, top10["点赞数"].values, color="#3a86ff")
plt.yticks(y_pos, top10["评论简写"].values)
plt.gca().invert_yaxis()
plt.xlabel("点赞数")
plt.ylabel("评论内容")
plt.title("高热度评论Top10")
plt.tight_layout()
plt.savefig(output_dir / "top10_likes_bar.png", dpi=300)
plt.close()
report_lines = [
"# 数据探索结论",
f"总样本数: {len(df)}",
f"负面: {int(sentiment_counts[-1])},中性: {int(sentiment_counts[0])},正面: {int(sentiment_counts[1])}",
"注: 情感标签基于SnowNLP阈值规则<=0.4负面,>=0.6正面,其余中性)。",
]
(output_dir / "exploration_report.md").write_text("\n".join(report_lines), encoding="utf-8")
print(f"探索分析完成,标签数据已保存: {labeled_csv}")
return df
def evaluate_model(
y_true: np.ndarray,
y_pred: np.ndarray,
method_name: str,
output_dir: Path,
) -> Dict[str, float]:
acc = accuracy_score(y_true, y_pred)
p_macro, r_macro, f1_macro, _ = precision_recall_fscore_support(
y_true, y_pred, average="macro", zero_division=0
)
neg_p, neg_r, neg_f1, _ = precision_recall_fscore_support(
y_true, y_pred, labels=[-1], average=None, zero_division=0
)
cm = confusion_matrix(y_true, y_pred, labels=[-1, 0, 1])
plot_confusion(
cm,
labels=["负面(-1)", "中性(0)", "正面(1)"],
title=f"{method_name} 混淆矩阵",
out_path=output_dir / f"confusion_{method_name}.png",
)
report_dict = classification_report(y_true, y_pred, output_dict=True, zero_division=0)
pd.DataFrame(report_dict).T.to_csv(
output_dir / f"classification_report_{method_name}.csv",
encoding="utf-8-sig",
)
return {
"方法": method_name,
"Accuracy": acc,
"Precision_macro": p_macro,
"Recall_macro": r_macro,
"F1_macro": f1_macro,
"Precision_neg": float(neg_p[0]),
"Recall_neg": float(neg_r[0]),
"F1_neg": float(neg_f1[0]),
}
def run_modeling(labeled_csv: Path, stopwords_path: Path, output_dir: Path) -> pd.DataFrame:
if not labeled_csv.exists():
raise FileNotFoundError(f"标签数据文件不存在: {labeled_csv}")
stopwords = load_stopwords(stopwords_path)
df = pd.read_csv(labeled_csv)
df = df[df["分词文本"].astype(str).str.strip().ne("")].copy()
df = df[df["类别"].isin([-1, 0, 1])].copy()
texts = df["分词文本"].astype(str).values
y = df["类别"].astype(int).values
indices = np.arange(len(df))
idx_train, idx_test = train_test_split(
indices, test_size=0.2, random_state=42, stratify=y
)
y_train = y[idx_train]
y_test = y[idx_test]
tfidf = TfidfTransformer()
vec1 = CountVectorizer()
x1_train_counts = vec1.fit_transform(texts[idx_train])
x1_test_counts = vec1.transform(texts[idx_test])
x1_train = tfidf.fit_transform(x1_train_counts)
x1_test = tfidf.transform(x1_test_counts)
clf1 = MultinomialNB()
clf1.fit(x1_train, y_train)
pred1 = clf1.predict(x1_test)
result1 = evaluate_model(y_test, pred1, "method1_default", output_dir)
try:
vec2 = CountVectorizer(max_df=0.85, min_df=5, stop_words=list(stopwords))
x2_train_counts = vec2.fit_transform(texts[idx_train])
x2_test_counts = vec2.transform(texts[idx_test])
except ValueError:
vec2 = CountVectorizer(max_df=0.9, min_df=2, stop_words=list(stopwords))
x2_train_counts = vec2.fit_transform(texts[idx_train])
x2_test_counts = vec2.transform(texts[idx_test])
x2_train = tfidf.fit_transform(x2_train_counts)
x2_test = tfidf.transform(x2_test_counts)
clf2 = MultinomialNB()
clf2.fit(x2_train, y_train)
pred2 = clf2.predict(x2_test)
result2 = evaluate_model(y_test, pred2, "method2_improved", output_dir)
dt = pd.to_datetime(df["评论时间"], errors="coerce")
date_ord = dt.map(lambda x: x.toordinal() if pd.notna(x) else np.nan)
date_ord = date_ord.fillna(date_ord.median())
likes = pd.to_numeric(df["点赞数"], errors="coerce").fillna(0) + 1
numeric = np.column_stack([date_ord.values, likes.values])
scaler = MinMaxScaler()
num_train = scaler.fit_transform(numeric[idx_train])
num_test = scaler.transform(numeric[idx_test])
x3_train = hstack([x2_train, csr_matrix(num_train)])
x3_test = hstack([x2_test, csr_matrix(num_test)])
clf3 = MultinomialNB()
clf3.fit(x3_train, y_train)
pred3 = clf3.predict(x3_test)
result3 = evaluate_model(y_test, pred3, "method2_plus_features", output_dir)
result_df = pd.DataFrame([result1, result2, result3])
result_df.to_csv(output_dir / "model_metrics_summary.csv", index=False, encoding="utf-8-sig")
compare_text = [
"# 模型评估结论",
f"方法一准确率: {result1['Accuracy']:.4f}",
f"方法二准确率: {result2['Accuracy']:.4f}",
f"方法二+时间点赞特征准确率: {result3['Accuracy']:.4f}",
f"方法二负面精确率: {result2['Precision_neg']:.4f},负面召回率: {result2['Recall_neg']:.4f}",
f"优化后负面精确率: {result3['Precision_neg']:.4f},负面召回率: {result3['Recall_neg']:.4f}",
"负面预测局限: 负样本占比可能偏低且吐槽语义常带反讽或上下文依赖SnowNLP弱监督标签会传递噪声。",
]
(output_dir / "model_report.md").write_text("\n".join(compare_text), encoding="utf-8")
print(f"建模完成,评估汇总文件: {output_dir / 'model_metrics_summary.csv'}")
return result_df
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="比亚迪B站评论情感分析项目")
sub = parser.add_subparsers(dest="command", required=True)
crawl_p = sub.add_parser("crawl", help="阶段一: 爬取评论(单次单视频)")
crawl_p.add_argument("--bvid", type=str, required=True, help="单次爬取的视频BV号")
crawl_p.add_argument("--target-comments", type=int, default=4500)
crawl_p.add_argument("--min-comments", type=int, default=4000)
crawl_p.add_argument("--sleep-min", type=float, default=0.35)
crawl_p.add_argument("--sleep-max", type=float, default=0.85)
crawl_p.add_argument("--rounds", type=int, default=1)
crawl_p.add_argument("--round-cooldown", type=float, default=45.0)
sub.add_parser("preprocess", help="阶段二: 文本预处理")
sub.add_parser("explore", help="阶段三: 数据探索")
sub.add_parser("model", help="阶段四/五: 建模与评估")
all_p = sub.add_parser("all", help="执行全流程(单次单视频)")
all_p.add_argument("--bvid", type=str, required=True, help="全流程中爬取的视频BV号")
all_p.add_argument("--target-comments", type=int, default=4500)
all_p.add_argument("--min-comments", type=int, default=4000)
all_p.add_argument("--sleep-min", type=float, default=0.35)
all_p.add_argument("--sleep-max", type=float, default=0.85)
all_p.add_argument("--rounds", type=int, default=1)
all_p.add_argument("--round-cooldown", type=float, default=45.0)
return parser.parse_args()
def main() -> None:
args = parse_args()
root = get_project_root()
paths = ensure_dirs(root)
stopwords_path = root / "cn_stopwords.txt"
font_path = ""
if args.command in {"explore", "model", "all"}:
font_path, font_name = detect_chinese_font()
setup_matplotlib(font_name)
if args.command == "crawl":
bvid = args.bvid.strip()
if not bvid:
raise ValueError("请通过 --bvid 传入有效的BV号")
crawl_comments(
bvids=[bvid],
out_csv=paths["raw_csv"],
target_comments=args.target_comments,
min_comments=args.min_comments,
sleep_min=args.sleep_min,
sleep_max=args.sleep_max,
rounds=args.rounds,
round_cooldown=args.round_cooldown,
)
elif args.command == "preprocess":
preprocess_comments(paths["raw_csv"], paths["preprocessed_csv"], stopwords_path)
elif args.command == "explore":
run_exploration(
preprocessed_csv=paths["preprocessed_csv"],
labeled_csv=paths["labeled_csv"],
output_dir=paths["outputs"],
font_path=font_path,
)
elif args.command == "model":
run_modeling(paths["labeled_csv"], stopwords_path, paths["outputs"])
elif args.command == "all":
bvid = args.bvid.strip()
if not bvid:
raise ValueError("请通过 --bvid 传入有效的BV号")
crawl_comments(
bvids=[bvid],
out_csv=paths["raw_csv"],
target_comments=args.target_comments,
min_comments=args.min_comments,
sleep_min=args.sleep_min,
sleep_max=args.sleep_max,
rounds=args.rounds,
round_cooldown=args.round_cooldown,
)
preprocess_comments(paths["raw_csv"], paths["preprocessed_csv"], stopwords_path)
run_exploration(
preprocessed_csv=paths["preprocessed_csv"],
labeled_csv=paths["labeled_csv"],
output_dir=paths["outputs"],
font_path=font_path,
)
run_modeling(paths["labeled_csv"], stopwords_path, paths["outputs"])
if __name__ == "__main__":
main()