构建融合设计模式 LevelDB DVC与Loki的版本化特征提取引擎


在迭代一个机器学习模型时,最混乱的环节往往不是模型训练,而是特征工程。我们团队的特征脚本散落在各个代码仓库,同一个特征可能有多个实现版本,没人能说清线上模型用的究竟是哪个版本的逻辑。当模型效果出现波动时,追溯问题就像一场噩梦:是数据源变了,还是某个特征的计算逻辑被悄悄修改了?整个流程缺乏版本控制、可观测性,并且高度耦合,每次新增一个特征都可能影响到现有逻辑,这在生产环境中是不可接受的。

我们的目标是构建一个独立的特征提取服务组件。它必须满足几个核心要求:

  1. 可插拔: 新增或修改特征逻辑,不应影响引擎核心代码。
  2. 可复现: 必须能精确追溯任意一次预测所使用的特征逻辑及其版本。
  3. 高性能: 特征提取与查询必须是低延迟的,以满足在线服务的需求。
  4. 可观测: 每一个特征的计算过程、耗时、成功与否都必须有详细的、可查询的日志。

基于这些痛点,我们构思了一个结合多种技术的解决方案。存储层,我们放弃了需要独立部署的Redis或关系型数据库,选择了嵌入式的LevelDB,它提供了极致的读写性能和简单的Key-Value模型,非常适合作为本地特征缓存。为了实现逻辑的可插拔,经典的设计模式(策略模式、工厂模式、装饰器模式)是最佳选择。对于版本化和可复现性,DVC (Data Version Control) 能像Git管理代码一样管理我们的特征定义和参数,这是解决问题的关键。最后,为了实现深度可观测性,我们将采用Loki进行结构化日志聚合,通过标签系统实现对特征性能的精细化监控。

这是一个从混乱走向规范的重构过程。下面是我们的实现日志。

架构设计与技术选型

整个引擎的核心是一个主进程,它负责加载所有独立的特征提取插件、管理LevelDB连接、并处理输入请求。

graph TD
    subgraph Feature Extraction Engine
        A[Input Request] --> B{Engine Core}
        B --> C[Plugin Factory]
        C -- Loads --> D1[Plugin: [email protected]]
        C -- Loads --> D2[Plugin: [email protected]]
        C -- Loads --> D3[Plugin: [email protected]]

        subgraph Decorators
            L[Logging Decorator] -- Wraps --> V[Versioning Decorator]
            V -- Wraps --> P[Base Plugin Logic]
        end

        B -- Dispatches to --> L

        L -- Writes Structured Log --> E[JSON Log File]

        B -- Caches Result --> F[LevelDB Instance]
        B -- Reads from Cache --> F
        F -- Persists to --> G[./feature_store.db]
    end

    subgraph DVC Workspace
        H[dvc.yaml] -- Defines Stages --> I[params.yaml]
        I -- Configures --> J[plugins/*.py]
    end

    subgraph Observability Stack
        K[Promtail Agent] -- Tailing --> E
        K -- Pushes to --> M[Loki Server]
        N[Grafana] -- Queries (LogQL) --> M
    end

    J -- Tracked by --> H
    style D1 fill:#f9f,stroke:#333,stroke-width:2px
    style D2 fill:#f9f,stroke:#333,stroke-width:2px
    style D3 fill:#f9f,stroke:#333,stroke-width:2px

1. 存储层 - LevelDB:
我们选择plyvel作为LevelDB的Python客户端。它的优势在于零配置、嵌入式、性能极高。对于特征缓存这种写一次、可能多次读的场景,LSM-Tree结构提供了非常高的写吞吐。由于我们的服务是单机组件,避免了网络开销和外部依赖。

2. 插件化核心 - 设计模式:

  • 策略模式 (Strategy Pattern): 每个特征提取器都是一个独立的策略。引擎本身不关心具体的计算逻辑,只负责调用策略接口。这使得我们可以独立开发、测试和部署每个特征。
  • 工厂模式 (Factory Pattern): 我们会有一个工厂类,负责扫描plugins目录,动态加载所有合法的特征提取器,并根据名称实例化它们。
  • 装饰器模式 (Decorator Pattern): 这是实现横切关注点(如日志、缓存、版本校验)的优雅方式。我们将创建一个或多个装饰器来包装原始的特征提取逻辑,注入额外的功能,而无需修改特征代码本身。

3. 版本控制 - DVC:
DVC将我们的特征工程代码(.py文件)和参数(params.yaml)纳入版本管理。dvc.yaml会定义一个“物化”特征配置的阶段,它能确保我们使用的每个插件都来自一个确定的Git提交。引擎在启动时会读取DVC生成的dvc.lock文件或元数据,来获知每个插件的精确版本。

4. 可观测性 - Loki:
我们会将日志以JSON格式输出,其中包含关键字段,如plugin_name, plugin_version, execution_time_ms, status, request_id等。Promtail(Loki的日志收集代理)会监控这个日志文件,解析JSON并将其中的字段提升为Loki的标签。这使得我们可以在Grafana中使用LogQL进行强大的查询,例如:sum by (plugin_name) (rate({job="feature_engine"} | json | unwrap execution_time_ms [5m])) 来计算每个特征的平均执行时间。

步骤化实现

1. 项目结构

一个良好组织的目录结构是项目的起点。

.
├── .dvc/                  # DVC元数据目录
├── configs/
│   └── promtail-config.yaml # Promtail 配置文件
├── data/
│   └── feature_store.db/    # LevelDB 数据库文件
├── logs/
│   └── engine.log           # 结构化日志输出
├── plugins/                 # 特征插件目录
│   ├── __init__.py
│   ├── base_extractor.py    # 插件抽象基类
│   ├── user_profile_extractor.py
│   └── item_popularity_extractor.py
├── core/
│   ├── __init__.py
│   ├── engine.py            # 核心引擎
│   ├── factory.py           # 插件工厂
│   └── decorators.py        # 功能装饰器
├── main.py                  # 应用入口
├── dvc.yaml                 # DVC pipeline 定义
├── params.yaml              # DVC 参数文件
└── requirements.txt

2. 插件接口与实现 (策略模式)

我们首先定义所有插件必须遵守的接口。

plugins/base_extractor.py:

# plugins/base_extractor.py
import abc
from typing import Dict, Any

class BaseFeatureExtractor(abc.ABC):
    """
    所有特征提取器插件的抽象基类 (策略接口)。
    每个实现类代表一种具体的特征提取策略。
    """
    
    # name 属性必须由子类定义,用于唯一标识插件
    @property
    @abc.abstractmethod
    def name(self) -> str:
        pass

    # version 属性也必须由子类定义,与 DVC 版本关联
    @property
    @abc.abstractmethod
    def version(self) -> str:
        pass

    @abc.abstractmethod
    def extract(self, user_input: Dict[str, Any]) -> Dict[str, Any]:
        """
        核心提取方法。
        :param user_input: 来自请求的原始输入数据。
        :return: 一个包含提取出的特征的字典。
        """
        pass

    def get_identifier(self) -> str:
        """返回插件的唯一标识符,格式为 'name@version'"""
        return f"{self.name}@{self.version}"

接下来是两个具体的插件实现。

plugins/user_profile_extractor.py:

# plugins/user_profile_extractor.py
import time
from typing import Dict, Any
from .base_extractor import BaseFeatureExtractor

class UserProfileExtractor(BaseFeatureExtractor):
    """
    提取用户基本画像特征,例如年龄段、注册时长。
    """
    
    @property
    def name(self) -> str:
        return "user_profile"

    @property
    def version(self) -> str:
        # 这个版本号应该由DVC或配置管理,这里为了演示先硬编码
        return "1.1.0"

    def extract(self, user_input: Dict[str, Any]) -> Dict[str, Any]:
        # 这里的实现逻辑在真实项目中会更复杂,可能需要查询其他服务或数据库
        # 为演示目的,我们进行简单计算
        if "user_age" not in user_input or "registration_timestamp" not in user_input:
            raise ValueError("Missing required fields 'user_age' or 'registration_timestamp'")

        age = user_input["user_age"]
        age_bucket = "unknown"
        if 18 <= age < 25:
            age_bucket = "18-24"
        elif 25 <= age < 35:
            age_bucket = "25-34"
        elif age >= 35:
            age_bucket = "35+"

        days_since_registration = (time.time() - user_input["registration_timestamp"]) / (24 * 3600)

        return {
            "f_user_age_bucket": age_bucket,
            "f_user_days_since_registration": round(days_since_registration, 2),
        }

3. DVC 版本化管理

现在我们用DVC来管理这些插件和它们的参数。假设我们的特征计算依赖一些参数。

params.yaml:

user_profile:
  version: "1.1.0"
  age_buckets:
    - [18, 24]
    - [25, 34]
    - [35, 99]

item_popularity:
  version: "0.9.0"
  time_window_hours: 72

dvc.yaml:

stages:
  prepare_plugins:
    cmd: python scripts/prepare_plugins.py
    deps:
    - plugins/
    - scripts/prepare_plugins.py
    params:
    - user_profile
    - item_popularity
    outs:
    - prepared_config.json

这个prepare_plugins.py脚本(此处省略其代码)的作用是读取params.yaml,并生成一个锁定版本的配置文件prepared_config.json。这个文件会被引擎读取,确保加载的插件版本和参数与DVC追踪的一致。当plugins/目录下的代码或params.yaml发生变化时,运行dvc repro就会重新生成prepared_config.json,并将结果记录在dvc.lock中,实现了完全的可复现性。

4. 功能装饰器与插件工厂 (装饰器与工厂模式)

我们需要一个装饰器来添加日志和性能监控。

core/decorators.py:

# core/decorators.py
import time
import json
import logging
from functools import wraps
from typing import Callable, Any
from plugins.base_extractor import BaseFeatureExtractor

# 配置一个专用的logger,用于输出结构化日志
# 在生产环境中,应该使用更复杂的日志配置
loki_logger = logging.getLogger("loki_structured_logger")
loki_logger.setLevel(logging.INFO)
handler = logging.FileHandler("logs/engine.log")
formatter = logging.Formatter('%(message)s') # 只记录原始json消息
handler.setFormatter(formatter)
loki_logger.addHandler(handler)


def observable(func: Callable) -> Callable:
    """
    装饰器,为特征提取器的 extract 方法添加可观测性。
    它记录执行时间、状态,并以JSON格式输出日志。
    """
    @wraps(func)
    def wrapper(self: BaseFeatureExtractor, *args, **kwargs) -> Any:
        start_time = time.perf_counter()
        status = "success"
        result = None
        error_message = ""
        
        try:
            result = func(self, *args, **kwargs)
            return result
        except Exception as e:
            status = "error"
            error_message = str(e)
            # 在生产环境中,应该重新抛出异常或有更精细的错误处理策略
            raise
        finally:
            end_time = time.perf_counter()
            duration_ms = (end_time - start_time) * 1000
            
            log_payload = {
                "timestamp": time.strftime('%Y-%m-%dT%H:%M:%S.%fZ', time.gmtime()),
                "level": "info" if status == "success" else "error",
                "message": f"Feature extraction {status}",
                "plugin_name": self.name,
                "plugin_version": self.version,
                "plugin_identifier": self.get_identifier(),
                "execution_time_ms": round(duration_ms, 3),
                "status": status,
                "error": error_message,
                # 可以在这里添加 request_id 等上下文信息
            }
            loki_logger.info(json.dumps(log_payload))

    return wrapper

现在是工厂类,它动态加载、实例化并用装饰器包装插件。

core/factory.py:

# core/factory.py
import inspect
import pkgutil
from typing import Dict, Type
from plugins.base_extractor import BaseFeatureExtractor
from core.decorators import observable

class PluginFactory:
    """
    插件工厂,负责发现、加载和实例化所有特征提取器插件。
    """
    def __init__(self, plugin_package_path: str):
        self.plugins: Dict[str, BaseFeatureExtractor] = {}
        self._load_plugins(plugin_package_path)

    def _load_plugins(self, package_path: str):
        """动态加载指定路径下的所有插件。"""
        # 导入包以使其可被发现
        import plugins
        
        for _, name, _ in pkgutil.iter_modules(plugins.__path__):
            module = __import__(f"plugins.{name}", fromlist=["*"])
            for item_name in dir(module):
                item = getattr(module, item_name)
                if inspect.isclass(item) and issubclass(item, BaseFeatureExtractor) and item is not BaseFeatureExtractor:
                    instance = item()
                    
                    # 关键步骤:使用装饰器包装 extract 方法
                    instance.extract = observable(instance.extract)
                    
                    self.plugins[instance.name] = instance
                    print(f"Loaded plugin: {instance.get_identifier()}")

    def get_plugin(self, name: str) -> BaseFeatureExtractor:
        """根据名称获取一个插件实例。"""
        plugin = self.plugins.get(name)
        if not plugin:
            raise KeyError(f"Plugin '{name}' not found.")
        return plugin

    def get_all_plugins(self) -> list[BaseFeatureExtractor]:
        """获取所有已加载的插件实例。"""
        return list(self.plugins.values())

5. 核心引擎与LevelDB集成

core/engine.py:

# core/engine.py
import plyvel
import json
from typing import Dict, Any, List
from core.factory import PluginFactory

class FeatureEngine:
    def __init__(self, db_path: str, plugin_package_path: str = "plugins"):
        self.db_path = db_path
        try:
            # 创建或打开LevelDB数据库
            self.db = plyvel.DB(self.db_path, create_if_missing=True)
        except plyvel.Error as e:
            # 在真实项目中,这里需要更健壮的错误处理和重试机制
            print(f"Fatal: Could not open LevelDB at {db_path}. Error: {e}")
            raise
        
        print("Initializing plugin factory...")
        self.factory = PluginFactory(plugin_package_path)
        self.plugins = self.factory.get_all_plugins()

    def process(self, user_id: str, request_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        处理单个请求,提取并缓存所有特征。
        """
        all_features = {}
        
        for plugin in self.plugins:
            plugin_id = plugin.get_identifier()
            # 构造缓存key,包含用户ID和插件标识符,确保版本更新后缓存失效
            cache_key = f"user:{user_id}:plugin:{plugin_id}".encode('utf-8')

            cached_features = self.db.get(cache_key)
            if cached_features:
                features = json.loads(cached_features.decode('utf-8'))
                print(f"Cache hit for {cache_key.decode('utf-8')}")
            else:
                print(f"Cache miss for {cache_key.decode('utf-8')}. Running extraction...")
                try:
                    features = plugin.extract(request_data)
                    # 将提取的特征存入LevelDB,设置一个TTL可能更好,但plyvel本身不支持
                    self.db.put(cache_key, json.dumps(features).encode('utf-8'))
                except Exception as e:
                    # 装饰器已经记录了错误,这里只打印到控制台
                    print(f"Error extracting features with {plugin_id}: {e}")
                    # 根据业务需求决定是否继续,或者返回部分特征
                    continue
            
            all_features.update(features)
        
        return all_features
    
    def close(self):
        """安全关闭数据库连接。"""
        if self.db and not self.db.closed:
            self.db.close()
            print("LevelDB connection closed.")

6. Loki 集成与可观测性

我们的日志已经以JSON格式写入logs/engine.log。现在配置Promtail来采集它们。

configs/promtail-config.yaml:

server:
  http_listen_port: 9080
  grpc_listen_port: 0

positions:
  filename: /tmp/positions.yaml

clients:
  - url: http://localhost:3100/loki/api/v1/push # Loki server address

scrape_configs:
- job_name: feature_engine
  static_configs:
  - targets:
      - localhost
    labels:
      job: feature_engine
      __path__: ./logs/engine.log # 监控的日志文件路径

  pipeline_stages:
  - json:
      expressions:
        level: level
        message: message
        plugin_name: plugin_name
        plugin_version: plugin_version
        status: status
  - labels:
      level:
      plugin_name:
      plugin_version:
      status:

这个配置告诉Promtail:

  1. 监控./logs/engine.log文件。
  2. 将每一行日志解析为JSON。
  3. 从JSON中提取level, plugin_name, plugin_version, status字段,并将它们作为Loki的索引标签。
  4. 将日志推送到本地的Loki实例。

启动Loki和Promtail后,我们就可以在Grafana中使用LogQL查询了。

  • 查询某个插件的所有错误日志: {job="feature_engine", plugin_name="user_profile", status="error"}
  • 计算过去1小时内各插件的错误率: sum by (plugin_name) (count_over_time({job="feature_engine", status="error"}[1h])) / sum by (plugin_name) (count_over_time({job="feature_engine"}[1h]))

7. 组装与运行

最后,main.py将所有组件串联起来。

main.py:

# main.py
from core.engine import FeatureEngine
import time
import random

def main():
    DB_PATH = "./data/feature_store.db"
    
    print("Starting Feature Extraction Engine...")
    engine = FeatureEngine(db_path=DB_PATH)

    # 模拟一些请求
    sample_requests = [
        {
            "user_id": "user-123",
            "data": {"user_age": 32, "registration_timestamp": time.time() - 86400 * 150}
        },
        {
            "user_id": "user-456",
            "data": {"user_age": 21, "registration_timestamp": time.time() - 86400 * 20}
        },
        {
            "user_id": "user-123", # 再次请求,应该会命中缓存
            "data": {"user_age": 32, "registration_timestamp": time.time() - 86400 * 150}
        },
        {
            "user_id": "user-789", # 模拟一个会出错的请求
            "data": {"registration_timestamp": time.time() - 86400 * 5}
        }
    ]

    try:
        for req in sample_requests:
            print("-" * 50)
            print(f"Processing request for {req['user_id']}...")
            features = engine.process(req['user_id'], req['data'])
            print(f"Extracted Features for {req['user_id']}: {features}")
            time.sleep(0.5)
    finally:
        engine.close()

if __name__ == "__main__":
    main()

运行这个程序,我们会看到插件被加载,特征被提取和缓存,并且logs/engine.log文件中充满了结构化的日志,等待被Loki采集。

局限性与未来迭代方向

我们构建的这个引擎解决了一开始提出的所有核心痛点。它通过设计模式实现了松耦合和高扩展性,利用LevelDB提供了高性能的本地缓存,借助DVC保证了特征逻辑的可复现性,并通过Loki获得了强大的可观测能力。

然而,这个方案并非银弹,它也有明确的适用边界和局限性:

  1. 单机限制: LevelDB是嵌入式数据库,有写锁,这意味着整个引擎是单进程、单线程写入的。它无法水平扩展。若要支持分布式或高并发写入,存储层需要替换为Redis Cluster, TiKV, 或其他分布式K/V存储。
  2. 插件加载机制: 目前是从本地文件系统动态加载。一个更成熟的系统可能需要一个插件注册中心,或者从S3、Git仓库等远程源安全地拉取和加载特定版本的插件包。
  3. 缓存策略: 当前的缓存永不过期,除非插件版本变化。在真实场景中,需要更复杂的缓存失效策略,例如基于时间的TTL(Time-To-Live)或基于事件的失效(例如,当用户的原始数据更新时)。
  4. DVC工作流集成: DVC的能力需要与CI/CD流程紧密集成才能发挥最大价值。例如,当一个特征插件的PR被合并时,CI流水线应自动运行dvc repro,并将新生成的dvc.lockprepared_config.json打包到服务部署的制品中。这需要额外的工程实践来落地。

  目录