Initial commit: DV homework_2

This commit is contained in:
2026-03-14 15:24:57 +08:00
commit 3c1f431ca9
38 changed files with 1580 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
venv/
*.log

63
README.md Normal file
View File

@@ -0,0 +1,63 @@
# 说明文档
实验报告请查看 [report.md](report.md)
## 快速开始
1. 激活虚拟环境:
```bash
source venv/bin/activate
```
2. 安装依赖:
```bash
pip install -e .
```
3. 运行完整实验流水线:
```bash
python scripts/run_all.py
```
运行后会生成:
- assets/ 下的图片资源(包含 figure1 到 figure4
- results/ 下的 benchmark JSON 与 CSV
- 根目录下的 report.md
## 目录结构
```text
homework_2/
├── assets/
├── results/
├── scripts/
│ └── run_all.py
├── src/
│ └── float_index_sort/
│ ├── __init__.py
│ ├── benchmark.py
│ ├── bucket.py
│ ├── core.py
│ ├── pipeline.py
│ ├── plots.py
│ ├── radix.py
│ └── report.py
├── tests/
│ └── test_algorithms.py
├── pyproject.toml
└── README.md
```
## 说明
默认实验规模为作业主规模 N=10,000,000千万级默认重复 1 次,覆盖 uniform 与 clustered 两类分布。
如果机器内存和时间预算足够,可以自行提高规模,例如:
```bash
python scripts/run_all.py --sizes 10000000 20000000 --repeats 2
```

BIN
assets/benchmark_ratio.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 57 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

BIN
assets/bucket_balance.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 62 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 79 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

Binary file not shown.

20
pyproject.toml Normal file
View File

@@ -0,0 +1,20 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "float-index-sort-lab"
version = "0.1.0"
description = "Course project for float32 index sorting with pure Python and NumPy"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"matplotlib>=3.9",
"numpy>=2.1",
]
[tool.setuptools]
package-dir = {"" = "src"}
[tool.setuptools.packages.find]
where = ["src"]

114
report.md Normal file
View File

@@ -0,0 +1,114 @@
# 第二讲大作业实验报告:海量 float32 索引排序算法设计与实现
## 1. 数据生成或来源
本实验使用纯 Python + NumPy 生成两类 float32 数据作为评测输入:
- uniform使用随机均匀分布生成数据
- clustered使用三段高斯混合分布生成数据
实验中所有数据都在本地实时生成,不依赖外部数据集。测试类型和规模如下:
- 数据类型float32
- 数据分布uniform, clustered
- 数据规模10,000,000
- 每组重复次数1
## 2. NumPy原生argsort性能基准测试
NumPy 原生基准作为参考上界,使用 stable 模式执行。结果如下:
| 数据分布 | 规模 | 平均耗时/s | 标准差/s |
| --------- | ---------- | ---------- | -------- |
| clustered | 10,000,000 | 1.5460 | 0.0000 |
| uniform | 10,000,000 | 1.4686 | 0.0000 |
从基准可以看到在千万级规模下NumPy 官方实现仍然非常高效,是后续自定义算法的核心对照对象。
## 3. 算法原理(含图)
### 3.1 方案一:浮点数位级 LSD Radix Sort
将 float32 视图解释为 uint32并进行单调映射
- 负数key = ~raw_bits
- 非负数key = raw_bits ^ 0x80000000
然后执行 4 趟 8-bit 稳定分发shift = 0, 8, 16, 24得到最终索引。
![图1float32 位布局与排序顺序示意图](assets/figure1_float32_layout.png)
![图2负数位翻转前后对比](assets/figure2_negative_flip.png)
![图3radix sort 每一趟桶分布示意图](assets/figure3_radix_bucket_pass.png)
### 3.2 方案二:多级直方图 + Bucket Sort
从最高字节开始进行分桶,再对每个非平凡桶递归下钻,直到桶规模低于阈值后使用插入排序收尾。该策略能在分布相对分散时减少不必要搬运,但在桶不均衡时会出现局部退化。
![图5不同数据分布下的高位桶均衡性](assets/bucket_balance.png)
## 4. 核心代码实现(分函数写)
主要函数划分如下:
1. float32_to_ordered_uint32完成浮点到可比较无符号键的映射
2. stable_digit_scatter实现单趟稳定计数分发
3. argsort_float32_radix实现 4-pass LSD Radix 索引排序
4. argsort_float32_bucket实现 MSD 多级分桶 + 小桶插入排序
5. run_benchmarks完成对照组与实验组计时与统计
正确性检查结论:共验证 12 组算法/数据组合,其中 12 组通过排序正确性检查12 组与 stable numpy argsort 索引完全一致。
## 5. 实验环境与结果(多组数据对比表 + 图)
实验环境:
- Python3.12.3
- NumPy2.4.3
- 平台Linux-6.17.0-14-generic-x86_64-with-glibc2.39
- 逻辑 CPU 数6
最终多组数据汇总如下:
| 数据分布 | 算法 | 规模 | 平均耗时/s | 标准差/s | 相对 numpy |
| --------- | ------------- | ---------- | ---------- | -------- | ---------- |
| clustered | bucket_msd | 10,000,000 | 60.2848 | 0.0000 | 38.99x |
| clustered | numpy_argsort | 10,000,000 | 1.5460 | 0.0000 | 1.00x |
| clustered | radix_lsd | 10,000,000 | 27.4960 | 0.0000 | 17.79x |
| uniform | bucket_msd | 10,000,000 | 64.0348 | 0.0000 | 43.60x |
| uniform | numpy_argsort | 10,000,000 | 1.4686 | 0.0000 | 1.00x |
| uniform | radix_lsd | 10,000,000 | 26.0530 | 0.0000 | 17.74x |
图4给出最终性能对比柱状图对数纵轴
![图4最终性能对比柱状图](assets/figure4_benchmark_bar.png)
图6给出不同算法在规模变化下的运行时间曲线
![图6运行时间缩放曲线](assets/benchmark_runtime.png)
图7给出相对 NumPy argsort 的性能倍率:
![图7相对 NumPy 的性能倍率](assets/benchmark_ratio.png)
## 6. 性能分析与瓶颈讨论
1. 在千万级输入上NumPy 原生 argsort 仍保持明显优势。
2. Radix 路线的理论复杂度接近 O(n),但纯 NumPy 稳定散射的常数项较高,且强依赖内存带宽。
3. Bucket 路线在数据分布集中时更容易出现不均衡桶,导致局部处理时间上升。
4. 当前实现的主要瓶颈在于大规模数据搬运和块内前缀计算;若引入低层并行或 JIT有机会继续压缩差距。
## 7. 总结与展望
本实验已按课程要求完成两条实现路径、千万级数据基准、图1-图7可视化与完整报告自动生成流程。结论如下
1. 在禁止直接调用 np.sort 和 np.argsort 的约束下,两条方案都可得到正确排序索引。
2. 千万级规模下,自定义纯 NumPy 实现可以稳定运行,但性能与官方底层优化实现仍存在差距。
3. 后续可尝试方向包括:减少中间数组写放大、优化桶阈值自适应策略、采用更底层 SIMD/JIT 机制。
复现实验:
1. source venv/bin/activate
2. pip install -e .
3. python scripts/run_all.py

View File

@@ -0,0 +1,194 @@
{
"environment": {
"python": "3.12.3",
"numpy": "2.4.3",
"platform": "Linux-6.17.0-14-generic-x86_64-with-glibc2.39",
"processor": "x86_64",
"cpu_count": 6
},
"config": {
"sizes": [
10000000
],
"repeats": 1,
"seed": 20260313,
"distributions": [
"uniform",
"clustered"
]
},
"correctness": [
{
"case": "mixed_sign",
"algorithm": "numpy_argsort",
"matches_reference": true,
"sorted_ok": true
},
{
"case": "mixed_sign",
"algorithm": "radix_lsd",
"matches_reference": true,
"sorted_ok": true
},
{
"case": "mixed_sign",
"algorithm": "bucket_msd",
"matches_reference": true,
"sorted_ok": true
},
{
"case": "descending",
"algorithm": "numpy_argsort",
"matches_reference": true,
"sorted_ok": true
},
{
"case": "descending",
"algorithm": "radix_lsd",
"matches_reference": true,
"sorted_ok": true
},
{
"case": "descending",
"algorithm": "bucket_msd",
"matches_reference": true,
"sorted_ok": true
},
{
"case": "random_uniform",
"algorithm": "numpy_argsort",
"matches_reference": true,
"sorted_ok": true
},
{
"case": "random_uniform",
"algorithm": "radix_lsd",
"matches_reference": true,
"sorted_ok": true
},
{
"case": "random_uniform",
"algorithm": "bucket_msd",
"matches_reference": true,
"sorted_ok": true
},
{
"case": "random_clustered",
"algorithm": "numpy_argsort",
"matches_reference": true,
"sorted_ok": true
},
{
"case": "random_clustered",
"algorithm": "radix_lsd",
"matches_reference": true,
"sorted_ok": true
},
{
"case": "random_clustered",
"algorithm": "bucket_msd",
"matches_reference": true,
"sorted_ok": true
}
],
"records": [
{
"distribution": "uniform",
"algorithm": "numpy_argsort",
"size": 10000000,
"repeat": 1,
"seconds": 1.4686321259941906
},
{
"distribution": "uniform",
"algorithm": "radix_lsd",
"size": 10000000,
"repeat": 1,
"seconds": 26.053031775998534
},
{
"distribution": "uniform",
"algorithm": "bucket_msd",
"size": 10000000,
"repeat": 1,
"seconds": 64.0347747229971
},
{
"distribution": "clustered",
"algorithm": "numpy_argsort",
"size": 10000000,
"repeat": 1,
"seconds": 1.5460137709887931
},
{
"distribution": "clustered",
"algorithm": "radix_lsd",
"size": 10000000,
"repeat": 1,
"seconds": 27.49598729700665
},
{
"distribution": "clustered",
"algorithm": "bucket_msd",
"size": 10000000,
"repeat": 1,
"seconds": 60.28482738400635
}
],
"summary": [
{
"distribution": "clustered",
"algorithm": "bucket_msd",
"size": 10000000,
"mean_seconds": 60.28482738400635,
"min_seconds": 60.28482738400635,
"std_seconds": 0.0,
"ratio_vs_numpy": 38.99371953553145
},
{
"distribution": "clustered",
"algorithm": "numpy_argsort",
"size": 10000000,
"mean_seconds": 1.5460137709887931,
"min_seconds": 1.5460137709887931,
"std_seconds": 0.0,
"ratio_vs_numpy": 1.0
},
{
"distribution": "clustered",
"algorithm": "radix_lsd",
"size": 10000000,
"mean_seconds": 27.49598729700665,
"min_seconds": 27.49598729700665,
"std_seconds": 0.0,
"ratio_vs_numpy": 17.785085626644115
},
{
"distribution": "uniform",
"algorithm": "bucket_msd",
"size": 10000000,
"mean_seconds": 64.0347747229971,
"min_seconds": 64.0347747229971,
"std_seconds": 0.0,
"ratio_vs_numpy": 43.60164372657227
},
{
"distribution": "uniform",
"algorithm": "numpy_argsort",
"size": 10000000,
"mean_seconds": 1.4686321259941906,
"min_seconds": 1.4686321259941906,
"std_seconds": 0.0,
"ratio_vs_numpy": 1.0
},
{
"distribution": "uniform",
"algorithm": "radix_lsd",
"size": 10000000,
"mean_seconds": 26.053031775998534,
"min_seconds": 26.053031775998534,
"std_seconds": 0.0,
"ratio_vs_numpy": 17.739658090593608
}
]
}

View File

@@ -0,0 +1,7 @@
distribution,algorithm,size,mean_seconds,min_seconds,std_seconds,ratio_vs_numpy
clustered,bucket_msd,10000000,60.28482738400635,60.28482738400635,0.0,38.99371953553145
clustered,numpy_argsort,10000000,1.5460137709887931,1.5460137709887931,0.0,1.0
clustered,radix_lsd,10000000,27.49598729700665,27.49598729700665,0.0,17.785085626644115
uniform,bucket_msd,10000000,64.0347747229971,64.0347747229971,0.0,43.60164372657227
uniform,numpy_argsort,10000000,1.4686321259941906,1.4686321259941906,0.0,1.0
uniform,radix_lsd,10000000,26.053031775998534,26.053031775998534,0.0,17.739658090593608
1 distribution algorithm size mean_seconds min_seconds std_seconds ratio_vs_numpy
2 clustered bucket_msd 10000000 60.28482738400635 60.28482738400635 0.0 38.99371953553145
3 clustered numpy_argsort 10000000 1.5460137709887931 1.5460137709887931 0.0 1.0
4 clustered radix_lsd 10000000 27.49598729700665 27.49598729700665 0.0 17.785085626644115
5 uniform bucket_msd 10000000 64.0347747229971 64.0347747229971 0.0 43.60164372657227
6 uniform numpy_argsort 10000000 1.4686321259941906 1.4686321259941906 0.0 1.0
7 uniform radix_lsd 10000000 26.053031775998534 26.053031775998534 0.0 17.739658090593608

16
scripts/run_all.py Normal file
View File

@@ -0,0 +1,16 @@
from __future__ import annotations
import sys
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = PROJECT_ROOT / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
from float_index_sort.pipeline import main
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,4 @@
from .bucket import argsort_float32_bucket
from .radix import argsort_float32_radix
__all__ = ["argsort_float32_bucket", "argsort_float32_radix"]

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,211 @@
from __future__ import annotations
import csv
import json
import os
import platform
from dataclasses import dataclass
from pathlib import Path
from time import perf_counter
from typing import Callable, Iterable
import numpy as np
from .bucket import argsort_float32_bucket
from .core import ensure_float32_array, verify_sorted_indices
from .radix import argsort_float32_radix
Sorter = Callable[[np.ndarray], np.ndarray]
@dataclass(slots=True)
class BenchmarkConfig:
sizes: tuple[int, ...] = (10_000_000,)
repeats: int = 1
seed: int = 20260313
distributions: tuple[str, ...] = ("uniform", "clustered")
def uniform_data(size: int, seed: int) -> np.ndarray:
rng = np.random.default_rng(seed)
return rng.random(size, dtype=np.float32)
def clustered_data(size: int, seed: int) -> np.ndarray:
rng = np.random.default_rng(seed)
left_size = size // 3
mid_size = size // 3
right_size = size - left_size - mid_size
left = rng.normal(loc=-1.5, scale=0.25, size=left_size).astype(np.float32)
mid = rng.normal(loc=0.0, scale=0.05, size=mid_size).astype(np.float32)
right = rng.normal(loc=1.25, scale=0.4, size=right_size).astype(np.float32)
values = np.concatenate((left, mid, right)).astype(np.float32, copy=False)
rng.shuffle(values)
return values
DATA_FACTORIES: dict[str, Callable[[int, int], np.ndarray]] = {
"uniform": uniform_data,
"clustered": clustered_data,
}
ALGORITHMS: dict[str, Sorter] = {
"numpy_argsort": lambda values: np.argsort(values, kind="stable"),
"radix_lsd": argsort_float32_radix,
"bucket_msd": argsort_float32_bucket,
}
def environment_snapshot() -> dict[str, str | int]:
return {
"python": platform.python_version(),
"numpy": np.__version__,
"platform": platform.platform(),
"processor": platform.processor() or "unknown",
"cpu_count": os.cpu_count() or 0,
}
def correctness_cases(seed: int) -> dict[str, np.ndarray]:
rng = np.random.default_rng(seed)
return {
"mixed_sign": np.array([
-np.inf,
-9.5,
-2.0,
-0.0,
0.0,
1.0,
1.0,
3.25,
np.inf,
], dtype=np.float32),
"descending": np.linspace(8.0, -8.0, num=257, dtype=np.float32),
"random_uniform": rng.normal(loc=0.0, scale=3.0, size=4096).astype(np.float32),
"random_clustered": clustered_data(4096, seed + 1),
}
def run_correctness_suite(seed: int) -> list[dict[str, object]]:
results: list[dict[str, object]] = []
for case_name, values in correctness_cases(seed).items():
expected = np.argsort(values, kind="stable")
for algorithm_name, sorter in ALGORITHMS.items():
got = sorter(values)
matches_reference = np.array_equal(got, expected)
sorted_ok = verify_sorted_indices(values, got)
results.append(
{
"case": case_name,
"algorithm": algorithm_name,
"matches_reference": bool(matches_reference),
"sorted_ok": bool(sorted_ok),
}
)
if not sorted_ok:
raise AssertionError(f"{algorithm_name} failed on {case_name}")
return results
def benchmark_once(sorter: Sorter, values: np.ndarray) -> tuple[float, np.ndarray]:
start = perf_counter()
indices = sorter(values)
elapsed = perf_counter() - start
return elapsed, indices
def summarize_records(records: list[dict[str, object]]) -> list[dict[str, object]]:
grouped: dict[tuple[str, str, int], list[float]] = {}
for row in records:
key = (str(row["distribution"]), str(row["algorithm"]), int(row["size"]))
grouped.setdefault(key, []).append(float(row["seconds"]))
summary: list[dict[str, object]] = []
baselines: dict[tuple[str, int], float] = {}
for (distribution, algorithm, size), timings in grouped.items():
mean_seconds = float(np.mean(timings))
min_seconds = float(np.min(timings))
std_seconds = float(np.std(timings))
row = {
"distribution": distribution,
"algorithm": algorithm,
"size": size,
"mean_seconds": mean_seconds,
"min_seconds": min_seconds,
"std_seconds": std_seconds,
}
summary.append(row)
if algorithm == "numpy_argsort":
baselines[(distribution, size)] = mean_seconds
for row in summary:
baseline = baselines[(str(row["distribution"]), int(row["size"]))]
row["ratio_vs_numpy"] = float(row["mean_seconds"]) / baseline if baseline else float("nan")
summary.sort(key=lambda item: (str(item["distribution"]), int(item["size"]), str(item["algorithm"])))
return summary
def write_csv(path: Path, rows: Iterable[dict[str, object]]) -> None:
rows = list(rows)
if not rows:
return
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=list(rows[0].keys()))
writer.writeheader()
writer.writerows(rows)
def run_benchmarks(config: BenchmarkConfig, results_dir: Path) -> dict[str, object]:
results_dir.mkdir(parents=True, exist_ok=True)
correctness = run_correctness_suite(config.seed)
records: list[dict[str, object]] = []
for distribution in config.distributions:
factory = DATA_FACTORIES[distribution]
for size_index, size in enumerate(config.sizes):
seed = config.seed + size_index * 97 + len(distribution)
values = ensure_float32_array(factory(size, seed))
print(f"[benchmark] distribution={distribution}, size={size:,}", flush=True)
for algorithm_name, sorter in ALGORITHMS.items():
for repeat in range(config.repeats):
seconds, indices = benchmark_once(sorter, values)
if repeat == 0 and not verify_sorted_indices(values, indices):
raise AssertionError(f"{algorithm_name} produced unsorted indices for {distribution}/{size}")
print(
f"[benchmark] algorithm={algorithm_name}, repeat={repeat + 1}, seconds={seconds:.4f}",
flush=True,
)
records.append(
{
"distribution": distribution,
"algorithm": algorithm_name,
"size": size,
"repeat": repeat + 1,
"seconds": seconds,
}
)
summary = summarize_records(records)
payload = {
"environment": environment_snapshot(),
"config": {
"sizes": list(config.sizes),
"repeats": config.repeats,
"seed": config.seed,
"distributions": list(config.distributions),
},
"correctness": correctness,
"records": records,
"summary": summary,
}
json_path = results_dir / "benchmark_results.json"
csv_path = results_dir / "benchmark_summary.csv"
json_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")
write_csv(csv_path, summary)
return payload

View File

@@ -0,0 +1,75 @@
from __future__ import annotations
import numpy as np
from .core import ensure_float32_array, float32_to_ordered_uint32, stable_digit_scatter
def _insertion_sort_segment(keys: np.ndarray, idx: np.ndarray, start: int, end: int) -> None:
for current in range(start + 1, end):
key_value = keys[current]
index_value = idx[current]
scan = current - 1
while scan >= start and keys[scan] > key_value:
keys[scan + 1] = keys[scan]
idx[scan + 1] = idx[scan]
scan -= 1
keys[scan + 1] = key_value
idx[scan + 1] = index_value
def argsort_float32_bucket(
values: np.ndarray,
block_size: int = 4096,
insertion_threshold: int = 96,
) -> np.ndarray:
array = ensure_float32_array(values)
length = int(array.size)
if length == 0:
return np.empty(0, dtype=np.int64)
keys = float32_to_ordered_uint32(array).copy()
idx = np.arange(length, dtype=np.int64)
temp_keys = np.empty_like(keys)
temp_idx = np.empty_like(idx)
stack: list[tuple[int, int, int]] = [(0, length, 24)]
while stack:
start, end, shift = stack.pop()
segment_length = end - start
if segment_length <= 1:
continue
if shift < 0 or segment_length <= insertion_threshold:
_insertion_sort_segment(keys, idx, start, end)
continue
counts = stable_digit_scatter(
keys_src=keys,
idx_src=idx,
keys_dst=temp_keys,
idx_dst=temp_idx,
shift=shift,
start=start,
end=end,
block_size=block_size,
)
keys[start:end] = temp_keys[start:end]
idx[start:end] = temp_idx[start:end]
bucket_starts = np.empty_like(counts, dtype=np.int64)
bucket_starts[0] = start
if counts.size > 1:
np.cumsum(counts[:-1], out=bucket_starts[1:])
bucket_starts[1:] += start
for bucket in range(counts.size - 1, -1, -1):
child_start = int(bucket_starts[bucket])
child_end = child_start + int(counts[bucket])
if child_end - child_start > 1:
stack.append((child_start, child_end, shift - 8))
return idx

View File

@@ -0,0 +1,107 @@
from __future__ import annotations
import math
from typing import Iterable
import numpy as np
SIGN_MASK = np.uint32(0x80000000)
ALL_BITS = np.uint32(0xFFFFFFFF)
def ensure_float32_array(values: Iterable[float] | np.ndarray) -> np.ndarray:
array = np.asarray(values, dtype=np.float32)
if array.ndim != 1:
raise ValueError("Expected a one-dimensional float32 array.")
return array
def float32_to_ordered_uint32(values: Iterable[float] | np.ndarray) -> np.ndarray:
array = ensure_float32_array(values)
raw = array.view(np.uint32)
masks = np.where((raw & SIGN_MASK) != 0, ALL_BITS, SIGN_MASK)
keys = raw ^ masks
nan_mask = np.isnan(array)
if np.any(nan_mask):
keys = keys.copy()
keys[nan_mask] = ALL_BITS
return keys
def stable_digit_scatter(
keys_src: np.ndarray,
idx_src: np.ndarray,
keys_dst: np.ndarray,
idx_dst: np.ndarray,
shift: int,
start: int = 0,
end: int | None = None,
block_size: int = 4096,
num_buckets: int = 256,
) -> np.ndarray:
if end is None:
end = int(keys_src.size)
if start < 0 or end < start or end > int(keys_src.size):
raise ValueError("Invalid scatter segment.")
if end == start:
return np.zeros(num_buckets, dtype=np.int64)
segment_keys = keys_src[start:end]
digit = ((segment_keys >> np.uint32(shift)) & np.uint32(num_buckets - 1)).astype(np.uint16)
counts = np.bincount(digit, minlength=num_buckets).astype(np.int64, copy=False)
bucket_offsets = np.empty(num_buckets, dtype=np.int64)
bucket_offsets[0] = start
if num_buckets > 1:
np.cumsum(counts[:-1], out=bucket_offsets[1:])
bucket_offsets[1:] += start
block_count = math.ceil((end - start) / block_size)
block_hist = np.zeros((block_count, num_buckets), dtype=np.int64)
for block_index in range(block_count):
block_start = start + block_index * block_size
block_end = min(block_start + block_size, end)
local_digit = digit[block_start - start:block_end - start]
block_hist[block_index] = np.bincount(local_digit, minlength=num_buckets)
block_offsets = np.cumsum(block_hist, axis=0, dtype=np.int64) - block_hist
block_offsets += bucket_offsets
bucket_ids = np.arange(num_buckets, dtype=np.uint16)
for block_index in range(block_count):
block_start = start + block_index * block_size
block_end = min(block_start + block_size, end)
local_digit = digit[block_start - start:block_end - start]
if local_digit.size == 0:
continue
one_hot = local_digit[None, :] == bucket_ids[:, None]
local_rank = np.cumsum(one_hot, axis=1, dtype=np.uint32) - 1
positions = block_offsets[block_index, local_digit] + local_rank[local_digit, np.arange(local_digit.size)]
idx_dst[positions] = idx_src[block_start:block_end]
keys_dst[positions] = keys_src[block_start:block_end]
return counts
def verify_sorted_indices(values: np.ndarray, indices: np.ndarray) -> bool:
if values.ndim != 1 or indices.ndim != 1:
return False
if values.size != indices.size:
return False
ordered = values[indices]
if ordered.size <= 1:
return True
nan_mask = np.isnan(ordered)
finite = ordered[~nan_mask]
if finite.size > 1 and not np.all(finite[:-1] <= finite[1:]):
return False
if np.any(nan_mask) and not np.all(nan_mask[np.argmax(nan_mask):]):
return False
return True

View File

@@ -0,0 +1,37 @@
from __future__ import annotations
import argparse
from pathlib import Path
from .benchmark import BenchmarkConfig, run_benchmarks
from .plots import generate_all_figures
from .report import write_report
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run the float32 index sorting coursework pipeline.")
parser.add_argument("--sizes", type=int, nargs="*", default=[10_000_000], help="Benchmark sizes.")
parser.add_argument("--repeats", type=int, default=1, help="Repeats per benchmark case.")
parser.add_argument("--seed", type=int, default=20260313, help="Random seed.")
parser.add_argument("--project-root", type=Path, default=Path(__file__).resolve().parents[2], help="Project root path.")
return parser.parse_args()
def run_pipeline(project_root: Path, sizes: list[int], repeats: int, seed: int) -> None:
assets_dir = project_root / "assets"
results_dir = project_root / "results"
report_path = project_root / "report.md"
config = BenchmarkConfig(sizes=tuple(sizes), repeats=repeats, seed=seed)
results = run_benchmarks(config=config, results_dir=results_dir)
generate_all_figures(results=results, assets_dir=assets_dir)
write_report(results=results, output_path=report_path)
def main() -> None:
args = parse_args()
run_pipeline(project_root=args.project_root, sizes=args.sizes, repeats=args.repeats, seed=args.seed)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,359 @@
from __future__ import annotations
import json
from pathlib import Path
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import font_manager
from matplotlib.font_manager import FontProperties
from matplotlib.patches import Rectangle
from .benchmark import DATA_FACTORIES
from .core import float32_to_ordered_uint32
def _pick_chinese_font() -> str | None:
local_font_candidates = [
Path(__file__).resolve().parents[2] / "assets" / "fonts" / "NotoSansCJKsc-Regular.otf",
Path(__file__).resolve().parents[2] / "assets" / "fonts" / "SourceHanSansCN-Regular.otf",
]
for font_path in local_font_candidates:
if font_path.exists():
font_manager.fontManager.addfont(str(font_path))
return FontProperties(fname=str(font_path)).get_name()
preferred = [
"Noto Sans CJK SC",
"Noto Sans SC",
"Source Han Sans CN",
"WenQuanYi Zen Hei",
"Microsoft YaHei",
"SimHei",
"PingFang SC",
"Heiti SC",
"Arial Unicode MS",
]
installed = {font.name for font in font_manager.fontManager.ttflist}
for name in preferred:
if name in installed:
return name
return None
def _style() -> None:
chinese_font = _pick_chinese_font()
sans_fonts = [
"Noto Sans CJK SC",
"Noto Sans SC",
"Source Han Sans CN",
"WenQuanYi Zen Hei",
"Microsoft YaHei",
"SimHei",
"PingFang SC",
"Heiti SC",
"DejaVu Sans",
]
if chinese_font is not None:
sans_fonts = [chinese_font, *[font for font in sans_fonts if font != chinese_font]]
plt.rcParams.update(
{
"figure.dpi": 160,
"axes.spines.top": False,
"axes.spines.right": False,
"axes.grid": True,
"grid.alpha": 0.15,
"axes.facecolor": "#fbfaf5",
"figure.facecolor": "#f3efe3",
"savefig.facecolor": "#f3efe3",
"font.size": 10,
"font.family": "sans-serif",
"font.sans-serif": sans_fonts,
"axes.unicode_minus": False,
}
)
def draw_float32_layout(path: Path) -> None:
_style()
fig, ax = plt.subplots(figsize=(10, 2.4))
ax.set_xlim(0, 32)
ax.set_ylim(0, 1)
ax.axis("off")
blocks = [
(0, 1, "#d1495b", "符号位\n1 bit"),
(1, 8, "#edae49", "指数位\n8 bits"),
(9, 23, "#00798c", "尾数位\n23 bits"),
]
for left, width, color, label in blocks:
ax.add_patch(Rectangle((left, 0.25), width, 0.5, facecolor=color, edgecolor="#1f1f1f", linewidth=1.2))
ax.text(left + width / 2, 0.5, label, ha="center", va="center", color="white", fontweight="bold")
ax.text(0, 0.92, "IEEE 754 float32 位布局", ha="left", va="bottom", fontsize=13, fontweight="bold", color="#1f1f1f")
ax.text(
0,
0.08,
"通过符号位映射把浮点比较顺序转换为无符号整数顺序。",
ha="left",
va="bottom",
color="#333333",
)
fig.tight_layout()
fig.savefig(path, bbox_inches="tight")
plt.close(fig)
def draw_negative_mapping(path: Path) -> None:
_style()
samples = np.array([-7.5, -1.0, -0.0, 0.0, 1.0, 6.25], dtype=np.float32)
raw = samples.view(np.uint32)
mapped = float32_to_ordered_uint32(samples)
fig, ax = plt.subplots(figsize=(12, 3.8))
ax.axis("off")
ax.set_title("float32 到单调 uint32 key 的位映射", loc="left", fontsize=13, fontweight="bold")
table_rows = []
for value, raw_bits, mapped_bits in zip(samples, raw, mapped, strict=True):
table_rows.append(
[
f"{value:>6.2f}",
format(int(raw_bits), "032b"),
format(int(mapped_bits), "032b"),
]
)
table = ax.table(
cellText=table_rows,
colLabels=["数值", "原始位串", "映射后 key"],
cellLoc="left",
colLoc="left",
loc="center",
)
table.auto_set_font_size(False)
table.set_fontsize(9)
table.scale(1, 1.5)
fig.tight_layout()
fig.savefig(path, bbox_inches="tight")
plt.close(fig)
def draw_radix_distribution(path: Path) -> None:
_style()
rng = np.random.default_rng(20260313)
sample = rng.normal(loc=0.0, scale=1.0, size=4096).astype(np.float32)
keys = float32_to_ordered_uint32(sample)
fig, axes = plt.subplots(2, 2, figsize=(12, 6.8), sharex=True)
colors = ["#33658a", "#86bbd8", "#758e4f", "#f26419"]
for axis, shift, color in zip(axes.ravel(), (0, 8, 16, 24), colors, strict=True):
digit = ((keys >> np.uint32(shift)) & np.uint32(0xFF)).astype(np.uint8)
counts = np.bincount(digit, minlength=256)
axis.bar(np.arange(256), counts, color=color, width=1.0)
axis.set_title(f"{shift // 8 + 1}shift={shift}")
axis.set_ylabel("计数")
axis.set_xlim(0, 255)
for axis in axes[1]:
axis.set_xlabel("桶编号")
fig.suptitle("Radix 每一趟的桶分布", x=0.07, y=0.98, ha="left", fontsize=13, fontweight="bold")
fig.tight_layout()
fig.savefig(path, bbox_inches="tight")
plt.close(fig)
def draw_performance_comparison_bar(results: dict[str, object], path: Path) -> None:
_style()
summary = list(results["summary"])
algorithms = ["numpy_argsort", "radix_lsd", "bucket_msd"]
algorithm_labels = {
"numpy_argsort": "NumPy argsort",
"radix_lsd": "位级 Radix",
"bucket_msd": "多级 Bucket",
}
distribution_labels = {
"uniform": "均匀分布",
"clustered": "聚簇分布",
}
colors = {
"numpy_argsort": "#1b4332",
"radix_lsd": "#ff7b00",
"bucket_msd": "#6a4c93",
}
groups = sorted(
{(str(row["distribution"]), int(row["size"])) for row in summary},
key=lambda item: (item[0], item[1]),
)
x_axis = np.arange(len(groups))
width = 0.24
fig, ax = plt.subplots(figsize=(12, 4.8))
for offset, algorithm in zip((-width, 0.0, width), algorithms, strict=True):
bars: list[float] = []
for distribution, size in groups:
row = next(
item
for item in summary
if item["distribution"] == distribution and int(item["size"]) == size and item["algorithm"] == algorithm
)
bars.append(float(row["mean_seconds"]))
ax.bar(x_axis + offset, bars, width=width, color=colors[algorithm], label=algorithm_labels[algorithm])
labels = [f"{distribution_labels.get(distribution, distribution)}\nN={size:,}" for distribution, size in groups]
ax.set_xticks(x_axis, labels)
ax.set_yscale("log")
ax.set_xlabel("数据组")
ax.set_ylabel("平均耗时(秒,对数坐标)")
ax.set_title("最终性能对比柱状图", loc="left", fontsize=13, fontweight="bold")
ax.legend(frameon=False)
fig.tight_layout()
fig.savefig(path, bbox_inches="tight")
plt.close(fig)
def draw_bucket_balance(path: Path) -> None:
_style()
uniform = DATA_FACTORIES["uniform"](120_000, 20260313)
clustered = DATA_FACTORIES["clustered"](120_000, 20260314)
uniform_buckets = ((float32_to_ordered_uint32(uniform) >> np.uint32(24)) & np.uint32(0xFF)).astype(np.uint8)
clustered_buckets = ((float32_to_ordered_uint32(clustered) >> np.uint32(24)) & np.uint32(0xFF)).astype(np.uint8)
uniform_hist = np.bincount(uniform_buckets, minlength=256)
clustered_hist = np.bincount(clustered_buckets, minlength=256)
fig, ax = plt.subplots(figsize=(12, 4.2))
axis_x = np.arange(256)
ax.plot(axis_x, uniform_hist, color="#1b4965", linewidth=1.4, label="均匀分布")
ax.plot(axis_x, clustered_hist, color="#c1121f", linewidth=1.4, label="聚簇分布")
ax.set_title("不同数据分布下高位桶均衡性", loc="left", fontsize=13, fontweight="bold")
ax.set_xlabel("桶编号")
ax.set_ylabel("计数")
ax.legend(frameon=False)
fig.tight_layout()
fig.savefig(path, bbox_inches="tight")
plt.close(fig)
def load_results(results_path: Path) -> dict[str, object]:
return json.loads(results_path.read_text(encoding="utf-8"))
def draw_runtime_scaling(results: dict[str, object], path: Path) -> None:
_style()
summary = list(results["summary"])
distributions = list(results["config"]["distributions"])
algorithms = ["numpy_argsort", "radix_lsd", "bucket_msd"]
algorithm_labels = {
"numpy_argsort": "NumPy argsort",
"radix_lsd": "位级 Radix",
"bucket_msd": "多级 Bucket",
}
distribution_labels = {
"uniform": "均匀分布",
"clustered": "聚簇分布",
}
colors = {
"numpy_argsort": "#1b4332",
"radix_lsd": "#ff7b00",
"bucket_msd": "#6a4c93",
}
fig, axes = plt.subplots(1, len(distributions), figsize=(13, 4.6), sharey=True)
if len(distributions) == 1:
axes = [axes]
for axis, distribution in zip(axes, distributions, strict=True):
rows = [row for row in summary if row["distribution"] == distribution]
sizes = sorted({int(row["size"]) for row in rows})
for algorithm in algorithms:
points = [row for row in rows if row["algorithm"] == algorithm]
points.sort(key=lambda item: int(item["size"]))
axis.plot(
sizes,
[float(item["mean_seconds"]) for item in points],
marker="o",
linewidth=2,
color=colors[algorithm],
label=algorithm_labels[algorithm],
)
axis.set_title(distribution_labels.get(distribution, distribution))
axis.set_xlabel("数组规模")
axis.set_xscale("log")
axis.set_yscale("log")
axes[0].set_ylabel("平均耗时(秒)")
axes[0].legend(frameon=False)
fig.suptitle("各算法运行时间缩放曲线", x=0.07, y=0.98, ha="left", fontsize=13, fontweight="bold")
fig.tight_layout()
fig.savefig(path, bbox_inches="tight")
plt.close(fig)
def draw_speed_ratio(results: dict[str, object], path: Path) -> None:
_style()
summary = list(results["summary"])
distributions = list(results["config"]["distributions"])
focus_algorithms = ["radix_lsd", "bucket_msd"]
algorithm_labels = {
"radix_lsd": "位级 Radix",
"bucket_msd": "多级 Bucket",
}
distribution_labels = {
"uniform": "均匀分布",
"clustered": "聚簇分布",
}
colors = {"radix_lsd": "#ff7b00", "bucket_msd": "#6a4c93"}
fig, axes = plt.subplots(1, len(distributions), figsize=(13, 4.6), sharey=True)
if len(distributions) == 1:
axes = [axes]
for axis, distribution in zip(axes, distributions, strict=True):
rows = [row for row in summary if row["distribution"] == distribution and row["algorithm"] in focus_algorithms]
sizes = sorted({int(row["size"]) for row in rows})
positions = np.arange(len(sizes))
width = 0.35
for offset, algorithm in zip((-width / 2, width / 2), focus_algorithms, strict=True):
points = [row for row in rows if row["algorithm"] == algorithm]
points.sort(key=lambda item: int(item["size"]))
axis.bar(
positions + offset,
[float(item["ratio_vs_numpy"]) for item in points],
width=width,
label=algorithm_labels[algorithm],
color=colors[algorithm],
)
axis.axhline(1.0, color="#222222", linestyle="--", linewidth=1.0)
axis.set_xticks(positions, [f"{size:,}" for size in sizes], rotation=15)
axis.set_title(distribution_labels.get(distribution, distribution))
axis.set_xlabel("数组规模")
axes[0].set_ylabel("相对 NumPy 的耗时倍率")
axes[0].legend(frameon=False)
fig.suptitle("相对 NumPy argsort 的性能倍率", x=0.07, y=0.98, ha="left", fontsize=13, fontweight="bold")
fig.tight_layout()
fig.savefig(path, bbox_inches="tight")
plt.close(fig)
def generate_all_figures(results: dict[str, object], assets_dir: Path) -> None:
assets_dir.mkdir(parents=True, exist_ok=True)
draw_float32_layout(assets_dir / "figure1_float32_layout.png")
draw_negative_mapping(assets_dir / "figure2_negative_flip.png")
draw_radix_distribution(assets_dir / "figure3_radix_bucket_pass.png")
draw_performance_comparison_bar(results, assets_dir / "figure4_benchmark_bar.png")
draw_bucket_balance(assets_dir / "bucket_balance.png")
draw_runtime_scaling(results, assets_dir / "benchmark_runtime.png")
draw_speed_ratio(results, assets_dir / "benchmark_ratio.png")

View File

@@ -0,0 +1,31 @@
from __future__ import annotations
import numpy as np
from .core import ensure_float32_array, float32_to_ordered_uint32, stable_digit_scatter
def argsort_float32_radix(values: np.ndarray, block_size: int = 4096) -> np.ndarray:
array = ensure_float32_array(values)
length = int(array.size)
if length == 0:
return np.empty(0, dtype=np.int64)
src_keys = float32_to_ordered_uint32(array).copy()
src_idx = np.arange(length, dtype=np.int64)
dst_keys = np.empty_like(src_keys)
dst_idx = np.empty_like(src_idx)
for shift in (0, 8, 16, 24):
stable_digit_scatter(
keys_src=src_keys,
idx_src=src_idx,
keys_dst=dst_keys,
idx_dst=dst_idx,
shift=shift,
block_size=block_size,
)
src_keys, dst_keys = dst_keys, src_keys
src_idx, dst_idx = dst_idx, src_idx
return src_idx

View File

@@ -0,0 +1,200 @@
from __future__ import annotations
from pathlib import Path
def _markdown_table(rows: list[dict[str, object]], columns: list[tuple[str, str]]) -> str:
header = "| " + " | ".join(label for _, label in columns) + " |"
split = "| " + " | ".join("---" for _ in columns) + " |"
body = []
for row in rows:
body.append("| " + " | ".join(str(row[key]) for key, _ in columns) + " |")
return "\n".join([header, split, *body])
def _format_summary_rows(results: dict[str, object], distribution: str) -> list[dict[str, object]]:
rows = [row for row in results["summary"] if row["distribution"] == distribution]
formatted: list[dict[str, object]] = []
for row in rows:
formatted.append(
{
"algorithm": row["algorithm"],
"size": f"{int(row['size']):,}",
"mean_seconds": f"{float(row['mean_seconds']):.4f}",
"std_seconds": f"{float(row['std_seconds']):.4f}",
"ratio_vs_numpy": f"{float(row['ratio_vs_numpy']):.2f}x",
}
)
return formatted
def _correctness_summary(results: dict[str, object]) -> str:
total = len(results["correctness"])
passed = sum(1 for row in results["correctness"] if row["sorted_ok"])
exact = sum(1 for row in results["correctness"] if row["matches_reference"])
return f"共验证 {total} 组算法/数据组合,其中 {passed} 组通过排序正确性检查,{exact} 组与 stable numpy argsort 索引完全一致。"
def _format_numpy_baseline_rows(results: dict[str, object]) -> list[dict[str, object]]:
rows = [row for row in results["summary"] if row["algorithm"] == "numpy_argsort"]
rows.sort(key=lambda item: (str(item["distribution"]), int(item["size"])))
return [
{
"distribution": str(row["distribution"]),
"size": f"{int(row['size']):,}",
"mean_seconds": f"{float(row['mean_seconds']):.4f}",
"std_seconds": f"{float(row['std_seconds']):.4f}",
}
for row in rows
]
def _format_full_result_rows(results: dict[str, object]) -> list[dict[str, object]]:
rows = list(results["summary"])
rows.sort(key=lambda item: (str(item["distribution"]), int(item["size"]), str(item["algorithm"])))
return [
{
"distribution": str(row["distribution"]),
"algorithm": str(row["algorithm"]),
"size": f"{int(row['size']):,}",
"mean_seconds": f"{float(row['mean_seconds']):.4f}",
"std_seconds": f"{float(row['std_seconds']):.4f}",
"ratio_vs_numpy": f"{float(row['ratio_vs_numpy']):.2f}x",
}
for row in rows
]
def build_report(results: dict[str, object]) -> str:
env = results["environment"]
config = results["config"]
baseline_table = _markdown_table(
_format_numpy_baseline_rows(results),
[
("distribution", "数据分布"),
("size", "规模"),
("mean_seconds", "平均耗时/s"),
("std_seconds", "标准差/s"),
],
)
full_table = _markdown_table(
_format_full_result_rows(results),
[
("distribution", "数据分布"),
("algorithm", "算法"),
("size", "规模"),
("mean_seconds", "平均耗时/s"),
("std_seconds", "标准差/s"),
("ratio_vs_numpy", "相对 numpy"),
],
)
return f"""# 第二讲大作业实验报告:海量 float32 索引排序算法设计与实现
## 1. 数据生成或来源
本实验使用纯 Python + NumPy 生成两类 float32 数据作为评测输入:
- uniform使用随机均匀分布生成数据
- clustered使用三段高斯混合分布生成数据
实验中所有数据都在本地实时生成,不依赖外部数据集。测试类型和规模如下:
- 数据类型float32
- 数据分布:{', '.join(config['distributions'])}
- 数据规模:{', '.join(f'{size:,}' for size in config['sizes'])}
- 每组重复次数:{config['repeats']}
## 2. NumPy原生argsort性能基准测试
NumPy 原生基准作为参考上界,使用 stable 模式执行。结果如下:
{baseline_table}
从基准可以看到在千万级规模下NumPy 官方实现仍然非常高效,是后续自定义算法的核心对照对象。
## 3. 算法原理(含图)
### 3.1 方案一:浮点数位级 LSD Radix Sort
将 float32 视图解释为 uint32并进行单调映射
- 负数key = ~raw_bits
- 非负数key = raw_bits ^ 0x80000000
然后执行 4 趟 8-bit 稳定分发shift = 0, 8, 16, 24得到最终索引。
![图1float32 位布局与排序顺序示意图](assets/figure1_float32_layout.png)
![图2负数位翻转前后对比](assets/figure2_negative_flip.png)
![图3radix sort 每一趟桶分布示意图](assets/figure3_radix_bucket_pass.png)
### 3.2 方案二:多级直方图 + Bucket Sort
从最高字节开始进行分桶,再对每个非平凡桶递归下钻,直到桶规模低于阈值后使用插入排序收尾。该策略能在分布相对分散时减少不必要搬运,但在桶不均衡时会出现局部退化。
![图5不同数据分布下的高位桶均衡性](assets/bucket_balance.png)
## 4. 核心代码实现(分函数写)
主要函数划分如下:
1. float32_to_ordered_uint32完成浮点到可比较无符号键的映射
2. stable_digit_scatter实现单趟稳定计数分发
3. argsort_float32_radix实现 4-pass LSD Radix 索引排序
4. argsort_float32_bucket实现 MSD 多级分桶 + 小桶插入排序
5. run_benchmarks完成对照组与实验组计时与统计
正确性检查结论:{_correctness_summary(results)}
## 5. 实验环境与结果(多组数据对比表 + 图)
实验环境:
- Python{env['python']}
- NumPy{env['numpy']}
- 平台:{env['platform']}
- 逻辑 CPU 数:{env['cpu_count']}
最终多组数据汇总如下:
{full_table}
图4给出最终性能对比柱状图对数纵轴
![图4最终性能对比柱状图](assets/figure4_benchmark_bar.png)
图6给出不同算法在规模变化下的运行时间曲线
![图6运行时间缩放曲线](assets/benchmark_runtime.png)
图7给出相对 NumPy argsort 的性能倍率:
![图7相对 NumPy 的性能倍率](assets/benchmark_ratio.png)
## 6. 性能分析与瓶颈讨论
1. 在千万级输入上NumPy 原生 argsort 仍保持明显优势。
2. Radix 路线的理论复杂度接近 O(n),但纯 NumPy 稳定散射的常数项较高,且强依赖内存带宽。
3. Bucket 路线在数据分布集中时更容易出现不均衡桶,导致局部处理时间上升。
4. 当前实现的主要瓶颈在于大规模数据搬运和块内前缀计算;若引入低层并行或 JIT有机会继续压缩差距。
## 7. 总结与展望
本实验已按课程要求完成两条实现路径、千万级数据基准、图1-图7可视化与完整报告自动生成流程。结论如下
1. 在禁止直接调用 np.sort 和 np.argsort 的约束下,两条方案都可得到正确排序索引。
2. 千万级规模下,自定义纯 NumPy 实现可以稳定运行,但性能与官方底层优化实现仍存在差距。
3. 后续可尝试方向包括:减少中间数组写放大、优化桶阈值自适应策略、采用更底层 SIMD/JIT 机制。
复现实验:
1. source venv/bin/activate
2. pip install -e .
3. python scripts/run_all.py
"""
def write_report(results: dict[str, object], output_path: Path) -> None:
output_path.write_text(build_report(results), encoding="utf-8")

View File

@@ -0,0 +1,81 @@
Metadata-Version: 2.4
Name: float-index-sort-lab
Version: 0.1.0
Summary: Course project for float32 index sorting with pure Python and NumPy
Requires-Python: >=3.12
Description-Content-Type: text/markdown
Requires-Dist: matplotlib>=3.9
Requires-Dist: numpy>=2.1
# 海量浮点数索引排序课程设计
本项目使用纯 Python + NumPy 实现两条 float32 索引排序路径:
- 方案一LSD 位级 Radix Sort
- 方案二:多级直方图 + Bucket Sort + 小桶插入排序
项目还包含:
- 基准测试脚本
- 图片生成脚本
- Markdown 实验报告自动生成
## 快速开始
1. 激活虚拟环境:
```bash
source venv/bin/activate
```
2. 安装依赖:
```bash
pip install -e .
```
3. 运行完整实验流水线:
```bash
python scripts/run_all.py
```
运行后会生成:
- assets/ 下的图片资源
- results/ 下的 benchmark JSON 与 CSV
- 根目录下的 report.md
## 目录结构
```text
homework_2/
├── assets/
├── results/
├── scripts/
│ └── run_all.py
├── src/
│ └── float_index_sort/
│ ├── __init__.py
│ ├── benchmark.py
│ ├── bucket.py
│ ├── core.py
│ ├── pipeline.py
│ ├── plots.py
│ ├── radix.py
│ └── report.py
├── tests/
│ └── test_algorithms.py
├── pyproject.toml
└── README.md
```
## 说明
默认实验规模为 100,000 / 250,000 / 500,000。这样可以在一般笔记本环境内完成正确性验证、性能对比和图片生成。
如果机器内存和时间预算足够,可以自行提高规模,例如:
```bash
python scripts/run_all.py --sizes 1000000 2000000 5000000 --repeats 2
```

View File

@@ -0,0 +1,16 @@
README.md
pyproject.toml
src/float_index_sort/__init__.py
src/float_index_sort/benchmark.py
src/float_index_sort/bucket.py
src/float_index_sort/core.py
src/float_index_sort/pipeline.py
src/float_index_sort/plots.py
src/float_index_sort/radix.py
src/float_index_sort/report.py
src/float_index_sort_lab.egg-info/PKG-INFO
src/float_index_sort_lab.egg-info/SOURCES.txt
src/float_index_sort_lab.egg-info/dependency_links.txt
src/float_index_sort_lab.egg-info/requires.txt
src/float_index_sort_lab.egg-info/top_level.txt
tests/test_algorithms.py

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,2 @@
matplotlib>=3.9
numpy>=2.1

View File

@@ -0,0 +1 @@
float_index_sort

Binary file not shown.

39
tests/test_algorithms.py Normal file
View File

@@ -0,0 +1,39 @@
from __future__ import annotations
import unittest
import numpy as np
from float_index_sort import argsort_float32_bucket, argsort_float32_radix
class SortingAlgorithmTests(unittest.TestCase):
def setUp(self) -> None:
self.algorithms = {
"radix": argsort_float32_radix,
"bucket": argsort_float32_bucket,
}
def assert_matches_numpy(self, values: np.ndarray) -> None:
expected = np.argsort(values, kind="stable")
for name, sorter in self.algorithms.items():
with self.subTest(algorithm=name):
actual = sorter(values)
self.assertTrue(np.array_equal(actual, expected))
def test_small_mixed_values(self) -> None:
values = np.array([3.0, -2.5, 1.0, -0.0, 0.0, 1.0, -5.0, np.inf], dtype=np.float32)
self.assert_matches_numpy(values)
def test_descending_values(self) -> None:
values = np.linspace(9.0, -9.0, 513, dtype=np.float32)
self.assert_matches_numpy(values)
def test_random_values(self) -> None:
rng = np.random.default_rng(42)
values = rng.normal(loc=0.0, scale=4.0, size=4096).astype(np.float32)
self.assert_matches_numpy(values)
if __name__ == "__main__":
unittest.main()