MongoDB WiredTiger 引擎压缩算法选型实践

MongoDB WiredTiger 引擎压缩算法选型实践

六一快乐

今天是六一儿童节,不管年龄多大,心中始终有一颗童心。

WiredTiger存储引擎背景

存储引擎是 MongoDB 负责管理数据的主要组件,WiredTiger 存储引擎是默认存储引擎(自 MongoDB 3.2 起)。具体可以参考:https://www.mongodb.com/zh-cn/docs/v5.0/core/wiredtiger/。利用 WiredTiger,MongoDB 能够支持对所有集合和索引进行压缩。压缩能够最大限度减少存储使用量,但会消耗额外的 CPU 资源。

对象默认压缩
集合(Collection)snappy(高速块压缩)
索引(Index)prefix compression(前缀去重)

WiredTiger 支持三种集合压缩算法:

算法特点
snappy默认,速度快,CPU 消耗低
zlib压缩率高,但速度慢
zstdv4.2+ 引入,兼顾压缩率与速度

在真实业务中,压缩策略需要在 读写比例、存储成本、CPU 余量​ 三者之间权衡。

测试三种压缩算法

测试目标

使用Python来简单对比一下三种压缩算法,对比 snappy / zlib / zstd​ 在 MongoDB 文档场景下的表现,为生产环境压缩算法选型提供依据。

测试环境

项目配置
OSmacOS(ARM)
CPUApple M1
Python3.11
依赖python-snappy==0.6.1,zstandard==0.22.0
MongoDB≥ 4.2(仅作为背景,测试为应用层压缩)

⚠️ 说明:

本测试为 应用层内存压缩,与 WiredTiger 内部 blockCompressor行为不完全等价,但趋势一致,可用于选型参考。

MongoDB 4.2+ 支持 zstd,默认压缩级别由 WiredTiger 控制(通常为 3 或 6,不同版本略有差异)。

本次测试采用 zstd level=3,与 MongoDB 默认行为对齐。

安装:pip install python-snappy zstandard

使用脚本生成 100MB JSONL 文件,模拟真实 MongoDB 文档结构:

  • 多种数据类型(字符串、数值、布尔、日期)
  • 嵌套文档(address)
  • 数组(items / tags)
#!/usr/bin/env python3
"""
生成 MongoDB 文档测试文件
- 模拟真实 MongoDB 文档结构
- 包含嵌套文档、数组、各种数据类型
- 生成指定大小的 JSON 文件
"""

import json
import random
import string
import datetime
import sys
from typing import List, Dict, Any

def generate_mongodb_document(doc_id: int) -> Dict[str, Any]:
    """生成一个模拟 MongoDB 文档"""
    
    # 随机生成字符串
    def random_string(length: int) -> str:
        return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
    
    # 随机生成日期
    def random_date(start_year=2020, end_year=2024):
        year = random.randint(start_year, end_year)
        month = random.randint(1, 12)
        day = random.randint(1, 28)
        return datetime.datetime(year, month, day)
    
    # 生成嵌套文档
    address = {
        "street": f"{random.randint(100, 9999)} {random.choice(['Main', 'Oak', 'Pine', 'Maple'])} St",
        "city": random.choice(["New York", "London", "Tokyo", "Beijing", "Sydney"]),
        "state": random_string(2).upper(),
        "zip": f"{random.randint(10000, 99999)}",
        "coordinates": {
            "lat": round(random.uniform(-90, 90), 6),
            "lng": round(random.uniform(-180, 180), 6)
        }
    }
    
    # 生成数组
    tags = random.sample([
        "mongodb", "database", "nosql", "performance", "testing",
        "python", "json", "compression", "benchmark", "storage"
    ], k=random.randint(2, 5))
    
    # 生成订单项数组
    items = []
    for i in range(random.randint(1, 5)):
        items.append({
            "item_id": random.randint(1000, 9999),
            "product": random_string(10),
            "price": round(random.uniform(10.0, 500.0), 2),
            "quantity": random.randint(1, 10),
            "metadata": {
                "category": random.choice(["electronics", "clothing", "books", "home"]),
                "in_stock": random.choice([True, False])
            }
        })
    
    # 完整文档
    document = {
        "_id": f"doc_{doc_id:08d}",
        "user_id": random.randint(100000, 999999),
        "session_id": random_string(32),
        "created_at": random_date(),
        "updated_at": datetime.datetime.now(),
        "is_active": random.choice([True, False]),
        "priority": random.choice(["low", "medium", "high"]),
        "score": round(random.uniform(0.0, 100.0), 2),
        "count": random.randint(0, 1000),
        "balance": round(random.uniform(-1000.0, 10000.0), 2),
        "email": f"user{doc_id}@example.com",
        "phone": f"+1{random.randint(1000000000, 9999999999)}",
        "address": address,
        "tags": tags,
        "items": items,
        "metadata": {
            "source": random.choice(["web", "mobile", "api"]),
            "version": f"{random.randint(1, 5)}.{random.randint(0, 9)}.{random.randint(0, 9)}",
            "features": {
                "feature_a": random.choice([True, False]),
                "feature_b": random.choice([True, False]),
                "feature_c": random.choice([True, False])
            }
        },
        "raw_data": random_string(200)  # 模拟原始数据字段
    }
    
    return document

def generate_mongodb_test_file(
    output_file: str = "mongodb_test_data.json",
    target_size_mb: float = 100.0,
    documents_per_batch: int = 1000
) -> None:
    """
    生成 MongoDB 测试文件
    
    Args:
        output_file: 输出文件名
        target_size_mb: 目标文件大小(MB)
        documents_per_batch: 每批生成的文档数
    """
    print(f"开始生成 MongoDB 测试文件: {output_file}")
    print(f"目标大小: {target_size_mb} MB")
    
    target_size_bytes = int(target_size_mb * 1024 * 1024)
    current_size = 0
    doc_id = 0
    batch_count = 0
    
    with open(output_file, 'w', encoding='utf-8') as f:
        # 写入 JSON 数组开始
        f.write('[\n')
        
        while current_size < target_size_bytes:
            batch = []
            for _ in range(documents_per_batch):
                doc = generate_mongodb_document(doc_id)
                batch.append(json.dumps(doc, default=str))
                doc_id += 1
            
            # 写入批次
            if batch_count > 0:
                f.write(',\n')
            
            f.write(',\n'.join(batch))
            
            # 更新大小和计数
            batch_json = ',\n'.join(batch)
            current_size += len(batch_json.encode('utf-8'))
            batch_count += 1
            
            # 显示进度
            progress = (current_size / target_size_bytes) * 100
            print(f"\r进度: {progress:.1f}% ({current_size/1024/1024:.1f} MB)", end='', flush=True)
            
            # 检查是否达到目标大小
            if current_size >= target_size_bytes:
                break
        
        # 写入 JSON 数组结束
        f.write('\n]')
    
    # 最终文件信息
    actual_size = current_size / 1024 / 1024
    print(f"\n\n生成完成!")
    print(f"文件: {output_file}")
    print(f"大小: {actual_size:.2f} MB")
    print(f"文档数: {doc_id:,}")
    print(f"平均每文档大小: {actual_size/doc_id*1024:.2f} KB")

def generate_jsonl_file(
    output_file: str = "mongodb_test_data.jsonl",
    target_size_mb: float = 100.0
) -> None:
    """
    生成 JSON Lines 格式文件(每行一个文档,更适合 MongoDB 导入)
    """
    print(f"开始生成 MongoDB JSONL 测试文件: {output_file}")
    print(f"目标大小: {target_size_mb} MB")
    
    target_size_bytes = int(target_size_mb * 1024 * 1024)
    current_size = 0
    doc_id = 0
    
    with open(output_file, 'w', encoding='utf-8') as f:
        while current_size < target_size_bytes:
            doc = generate_mongodb_document(doc_id)
            line = json.dumps(doc, default=str) + '\n'
            f.write(line)
            
            current_size += len(line.encode('utf-8'))
            doc_id += 1
            
            # 显示进度
            if doc_id % 1000 == 0:
                progress = (current_size / target_size_bytes) * 100
                print(f"\r进度: {progress:.1f}% ({current_size/1024/1024:.1f} MB, {doc_id:,} 文档)", 
                      end='', flush=True)
    
    # 最终文件信息
    actual_size = current_size / 1024 / 1024
    print(f"\n\n生成完成!")
    print(f"文件: {output_file}")
    print(f"大小: {actual_size:.2f} MB")
    print(f"文档数: {doc_id:,}")
    print(f"平均每文档大小: {actual_size/doc_id*1024:.2f} KB")
    print(f"\n导入 MongoDB 命令:")
    print(f"mongoimport --db test --collection docs --file {output_file} --jsonArray")

if __name__ == "__main__":
    # 生成 JSON 数组格式(适合压缩测试)
    generate_mongodb_test_file(
        output_file="mongodb_test_data.json",
        target_size_mb=100.0
    )
    
    print("\n" + "="*60 + "\n")
    
    # 生成 JSON Lines 格式(适合 MongoDB 导入)
    generate_jsonl_file(
        output_file="mongodb_test_data.jsonl",
        target_size_mb=100.0
    )

{
  "_id": "doc_00001234",
  "user_id": 567890,
  "session_id": "AbCdEfGhIjKlMnOpQrStUvWxYz123456",
  "created_at": "2023-05-15T00:00:00",
  "is_active": true,
  "priority": "high",
  "score": 87.5,
  "balance": 1234.56,
  "email": "user1234@example.com",
  "address": {
    "street": "123 Main St",
    "city": "New York",
    "coordinates": {"lat": 40.7128, "lng": -74.0060}
  },
  "tags": ["mongodb", "performance", "testing"],
  "items": [
    {
      "item_id": 5678,
      "product": "WidgetX",
      "price": 99.99,
      "metadata": {"category": "electronics"}
    }
  ]
}

单条文档示例:

{
  "_id": "doc_00001234",
  "user_id": 567890,
  "session_id": "AbCdEfGhIjKlMnOpQrStUvWxYz123456",
  "created_at": "2023-05-15T00:00:00",
  "is_active": true,
  "priority": "high",
  "score": 87.5,
  "balance": 1234.56,
  "email": "user1234@example.com",
  "address": {
    "street": "123 Main St",
    "city": "New York",
    "coordinates": {"lat": 40.7128, "lng": -74.0060}
  },
  "tags": ["mongodb", "performance", "testing"],
  "items": [
    {
      "item_id": 5678,
      "product": "WidgetX",
      "price": 99.99,
      "metadata": {"category": "electronics"}
    }
  ]
}

测试方法

使用Deepseek生成测试脚本,具体如下:

import time
import snappy
import zstandard as zstd
import zlib
import sys

# ==================== 配置参数 ====================
TEST_FILE_PATH = "/Users/lilj/Desktop/work_python/day_1/mongodb_test_data.jsonl"
TEST_TIMES = 10
ZSTD_LEVEL = 3
ZLIB_LEVEL = 3
# ==================================================

def read_test_file(file_path):
    try:
        with open(file_path, "rb") as f:
            return f.read()
    except FileNotFoundError:
        print(f"❌ 错误:文件未找到:\n{file_path}")
        sys.exit(1)

def run_single_test(compress_func, decompress_func, data):
    """运行单次测试"""
    start = time.time()
    compressed = compress_func(data)
    compress_time = time.time() - start
    
    start = time.time()
    decompressed = decompress_func(compressed)
    decompress_time = time.time() - start
    
    if data != decompressed:
        raise ValueError("解压失败,数据不一致!")
    
    return compress_time, decompress_time, len(compressed)

def main():
    print("=" * 80)
    print("压缩算法性能测试(10次取平均值)")
    print(f"测试文件:{TEST_FILE_PATH}")
    print("=" * 80)
    
    data = read_test_file(TEST_FILE_PATH)
    original_size = len(data)
    print(f"原始文件大小:{original_size / 1024 / 1024:.2f} MB\n")
    
    # 初始化结果
    snappy_compress_times = []
    snappy_decompress_times = []
    snappy_compressed_size = None
    
    zstd_compress_times = []
    zstd_decompress_times = []
    zstd_compressed_size = None
    
    zlib_compress_times = []
    zlib_decompress_times = []
    zlib_compressed_size = None
    
    zstd_cctx = zstd.ZstdCompressor(level=ZSTD_LEVEL)
    zstd_dctx = zstd.ZstdDecompressor()
    
    for i in range(TEST_TIMES):
        print(f"正在运行第 {i+1}/{TEST_TIMES} 次测试...")
        
        # Snappy
        ct, dt, csize = run_single_test(
            lambda d: snappy.compress(d),
            lambda c: snappy.decompress(c),
            data
        )
        snappy_compress_times.append(ct)
        snappy_decompress_times.append(dt)
        if snappy_compressed_size is None:
            snappy_compressed_size = csize
        
        # Zstd
        ct, dt, csize = run_single_test(
            lambda d: zstd_cctx.compress(d),
            lambda c: zstd_dctx.decompress(c),
            data
        )
        zstd_compress_times.append(ct)
        zstd_decompress_times.append(dt)
        if zstd_compressed_size is None:
            zstd_compressed_size = csize
        
        # zlib
        ct, dt, csize = run_single_test(
            lambda d: zlib.compress(d, level=ZLIB_LEVEL),
            lambda c: zlib.decompress(c),
            data
        )
        zlib_compress_times.append(ct)
        zlib_decompress_times.append(dt)
        if zlib_compressed_size is None:
            zlib_compressed_size = csize
    
    # 计算平均值
    avg_snappy_compress = sum(snappy_compress_times) / TEST_TIMES * 1000
    avg_snappy_decompress = sum(snappy_decompress_times) / TEST_TIMES * 1000
    snappy_ratio = original_size / snappy_compressed_size
    
    avg_zstd_compress = sum(zstd_compress_times) / TEST_TIMES * 1000
    avg_zstd_decompress = sum(zstd_decompress_times) / TEST_TIMES * 1000
    zstd_ratio = original_size / zstd_compressed_size
    
    avg_zlib_compress = sum(zlib_compress_times) / TEST_TIMES * 1000
    avg_zlib_decompress = sum(zlib_decompress_times) / TEST_TIMES * 1000
    zlib_ratio = original_size / zlib_compressed_size
    
    # 输出结果
    print("\n" + "=" * 80)
    print("最终测试结果(10次平均值)")
    print("=" * 80)
    print(f"{'算法':<12} {'平均压缩耗时(ms)':<18} {'平均解压耗时(ms)':<18} {'压缩比':<10} {'压缩后大小(MB)'}")
    print("-" * 75)
    print(f"{'Snappy':<12} {avg_snappy_compress:<18.2f} {avg_snappy_decompress:<18.2f} {snappy_ratio:<10.2f} {snappy_compressed_size/1024/1024:.2f}")
    print(f"{'Zstd(l3)':<12} {avg_zstd_compress:<18.2f} {avg_zstd_decompress:<18.2f} {zstd_ratio:<10.2f} {zstd_compressed_size/1024/1024:.2f}")
    print(f"{'zlib(l3)':<12} {avg_zlib_compress:<18.2f} {avg_zlib_decompress:<18.2f} {zlib_ratio:<10.2f} {zlib_compressed_size/1024/1024:.2f}")
    
    print("\n💡 说明:")
    print("1. 压缩比 = 原始大小 / 压缩后大小(数值越大压缩效果越好)")
    print("2. 测试已验证解压数据与原始文件完全一致")

if __name__ == "__main__":
    main()

运行脚本:

#第一次
================================================================================
压缩算法性能测试(10次取平均值)
测试文件:/Users/lilj/Desktop/work_python/day_1/mongodb_test_data.jsonl
================================================================================
原始文件大小:100.00 MB

正在运行第 1/10 次测试...
正在运行第 2/10 次测试...
正在运行第 3/10 次测试...
正在运行第 4/10 次测试...
正在运行第 5/10 次测试...
正在运行第 6/10 次测试...
正在运行第 7/10 次测试...
正在运行第 8/10 次测试...
正在运行第 9/10 次测试...
正在运行第 10/10 次测试...

================================================================================
最终测试结果(10次平均值)
================================================================================
算法           平均压缩耗时(ms)         平均解压耗时(ms)         压缩比        压缩后大小(MB)
---------------------------------------------------------------------------
Snappy       244.49             135.96             2.32       43.15
Zstd(l3)     261.45             74.14              3.55       28.16
zlib(l3)     985.68             278.56             3.26       30.63

💡 说明:
1. 压缩比 = 原始大小 / 压缩后大小(数值越大压缩效果越好)
2. 测试已验证解压数据与原始文件完全一致

#第二次
================================================================================
压缩算法性能测试(10次取平均值)
测试文件:/Users/lilj/Desktop/work_python/day_1/mongodb_test_data.jsonl
================================================================================
原始文件大小:100.00 MB

正在运行第 1/10 次测试...
正在运行第 2/10 次测试...
正在运行第 3/10 次测试...
正在运行第 4/10 次测试...
正在运行第 5/10 次测试...
正在运行第 6/10 次测试...
正在运行第 7/10 次测试...
正在运行第 8/10 次测试...
正在运行第 9/10 次测试...
正在运行第 10/10 次测试...

================================================================================
最终测试结果(10次平均值)
================================================================================
算法           平均压缩耗时(ms)         平均解压耗时(ms)         压缩比        压缩后大小(MB)
---------------------------------------------------------------------------
Snappy       153.94             143.12             2.32       43.15
Zstd(l3)     270.58             77.32              3.55       28.16
zlib(l3)     1074.32            331.38             3.26       30.63

💡 说明:
1. 压缩比 = 原始大小 / 压缩后大小(数值越大压缩效果越好)
2. 测试已验证解压数据与原始文件完全一致

测试结果

汇总数据(10 次平均)

算法平均压缩耗时(ms)平均解压耗时(ms)压缩比压缩后大小(MB)
Snappy199.22139.542.3243.15
Zstd(l3)266.0275.733.5528.16
zlib(l3)1030.00304.973.2630.63

存储节省对比

算法100MB数据实际占用节省空间
不压缩100 MB0%
Snappy43.15 MB57%
Zstd28.16 MB72%
zlib30.63 MB69%

性能影响

算法写入延迟增加影响程度
Snappy+199ms🟢 极小
Zstd+266ms🟡 可接受
zlib+1030ms🔴 严重

读取性能提升

算法网络传输时间减少解压时间增加净收益
Snappy57%+140ms🟡 持平
Zstd72%+76ms🟢 正收益
zlib69%+305ms🔴 负收益

结果分析

✅ Zstd 是 MongoDB 文档场景的最优解

  • 压缩率最高(3.55x)
  • 解压速度最快(76ms,优于 Snappy)
  • 压缩速度略慢于 Snappy,但在大多数业务可接受范围内

⚠️ Snappy 并非“过时”

  • 在 高频写入 / 热集合​ 场景中,Snappy 仍是稳妥选择
  • CPU 敏感型业务可优先考虑 Snappy

❌ zlib 性价比最低

  • 压缩慢、解压慢、压缩率不如 Zstd
  • 仅建议在 必须兼容 gzip 的老系统​ 中使用

线上部署

在一次生产集群扩容受限的场景中:

指标切换前(snappy)切换后(zstd)
磁盘使用率95%55%
CPU 使用率基线无明显上升
写入延迟基线小幅增加(可接受)

✅ 在不增加硬件成本的前提下,显著提升了存储上限。

写在最后

大家都在拼算力、拼集群、拼调度,但真正悄悄烧钱的,往往是“文件格式 + 压缩算法”这种底层细节。

文件格式 + 压缩算法,是隐形成本高地

在 MongoDB 文档场景下:

  • Zstd level 3​ 在存储节省、解压性能、CPU 消耗之间取得最佳平衡
  • 适合作为新业务的默认选择

生产变更前,建议在小流量集合验证,时刻监控磁盘、CPU、写入延迟。

在 MongoDB 文档场景下,Zstd 是存储成本与性能之间的最优解;Snappy 则是高频写入场景的安全选择。

留下回复

error: Content is protected !!