在迭代一个机器学习模型时,最混乱的环节往往不是模型训练,而是特征工程。我们团队的特征脚本散落在各个代码仓库,同一个特征可能有多个实现版本,没人能说清线上模型用的究竟是哪个版本的逻辑。当模型效果出现波动时,追溯问题就像一场噩梦:是数据源变了,还是某个特征的计算逻辑被悄悄修改了?整个流程缺乏版本控制、可观测性,并且高度耦合,每次新增一个特征都可能影响到现有逻辑,这在生产环境中是不可接受的。
我们的目标是构建一个独立的特征提取服务组件。它必须满足几个核心要求:
- 可插拔: 新增或修改特征逻辑,不应影响引擎核心代码。
- 可复现: 必须能精确追溯任意一次预测所使用的特征逻辑及其版本。
- 高性能: 特征提取与查询必须是低延迟的,以满足在线服务的需求。
- 可观测: 每一个特征的计算过程、耗时、成功与否都必须有详细的、可查询的日志。
基于这些痛点,我们构思了一个结合多种技术的解决方案。存储层,我们放弃了需要独立部署的Redis或关系型数据库,选择了嵌入式的LevelDB
,它提供了极致的读写性能和简单的Key-Value模型,非常适合作为本地特征缓存。为了实现逻辑的可插拔,经典的设计模式
(策略模式、工厂模式、装饰器模式)是最佳选择。对于版本化和可复现性,DVC
(Data Version Control) 能像Git管理代码一样管理我们的特征定义和参数,这是解决问题的关键。最后,为了实现深度可观测性,我们将采用Loki
进行结构化日志聚合,通过标签系统实现对特征性能的精细化监控。
这是一个从混乱走向规范的重构过程。下面是我们的实现日志。
架构设计与技术选型
整个引擎的核心是一个主进程,它负责加载所有独立的特征提取插件、管理LevelDB连接、并处理输入请求。
graph TD subgraph Feature Extraction Engine A[Input Request] --> B{Engine Core} B --> C[Plugin Factory] C -- Loads --> D1[Plugin: [email protected]] C -- Loads --> D2[Plugin: [email protected]] C -- Loads --> D3[Plugin: [email protected]] subgraph Decorators L[Logging Decorator] -- Wraps --> V[Versioning Decorator] V -- Wraps --> P[Base Plugin Logic] end B -- Dispatches to --> L L -- Writes Structured Log --> E[JSON Log File] B -- Caches Result --> F[LevelDB Instance] B -- Reads from Cache --> F F -- Persists to --> G[./feature_store.db] end subgraph DVC Workspace H[dvc.yaml] -- Defines Stages --> I[params.yaml] I -- Configures --> J[plugins/*.py] end subgraph Observability Stack K[Promtail Agent] -- Tailing --> E K -- Pushes to --> M[Loki Server] N[Grafana] -- Queries (LogQL) --> M end J -- Tracked by --> H style D1 fill:#f9f,stroke:#333,stroke-width:2px style D2 fill:#f9f,stroke:#333,stroke-width:2px style D3 fill:#f9f,stroke:#333,stroke-width:2px
1. 存储层 - LevelDB:
我们选择plyvel
作为LevelDB的Python客户端。它的优势在于零配置、嵌入式、性能极高。对于特征缓存这种写一次、可能多次读的场景,LSM-Tree结构提供了非常高的写吞吐。由于我们的服务是单机组件,避免了网络开销和外部依赖。
2. 插件化核心 - 设计模式:
- 策略模式 (Strategy Pattern): 每个特征提取器都是一个独立的策略。引擎本身不关心具体的计算逻辑,只负责调用策略接口。这使得我们可以独立开发、测试和部署每个特征。
- 工厂模式 (Factory Pattern): 我们会有一个工厂类,负责扫描
plugins
目录,动态加载所有合法的特征提取器,并根据名称实例化它们。 - 装饰器模式 (Decorator Pattern): 这是实现横切关注点(如日志、缓存、版本校验)的优雅方式。我们将创建一个或多个装饰器来包装原始的特征提取逻辑,注入额外的功能,而无需修改特征代码本身。
3. 版本控制 - DVC:
DVC将我们的特征工程代码(.py
文件)和参数(params.yaml
)纳入版本管理。dvc.yaml
会定义一个“物化”特征配置的阶段,它能确保我们使用的每个插件都来自一个确定的Git提交。引擎在启动时会读取DVC生成的dvc.lock
文件或元数据,来获知每个插件的精确版本。
4. 可观测性 - Loki:
我们会将日志以JSON格式输出,其中包含关键字段,如plugin_name
, plugin_version
, execution_time_ms
, status
, request_id
等。Promtail(Loki的日志收集代理)会监控这个日志文件,解析JSON并将其中的字段提升为Loki的标签。这使得我们可以在Grafana中使用LogQL进行强大的查询,例如:sum by (plugin_name) (rate({job="feature_engine"} | json | unwrap execution_time_ms [5m]))
来计算每个特征的平均执行时间。
步骤化实现
1. 项目结构
一个良好组织的目录结构是项目的起点。
.
├── .dvc/ # DVC元数据目录
├── configs/
│ └── promtail-config.yaml # Promtail 配置文件
├── data/
│ └── feature_store.db/ # LevelDB 数据库文件
├── logs/
│ └── engine.log # 结构化日志输出
├── plugins/ # 特征插件目录
│ ├── __init__.py
│ ├── base_extractor.py # 插件抽象基类
│ ├── user_profile_extractor.py
│ └── item_popularity_extractor.py
├── core/
│ ├── __init__.py
│ ├── engine.py # 核心引擎
│ ├── factory.py # 插件工厂
│ └── decorators.py # 功能装饰器
├── main.py # 应用入口
├── dvc.yaml # DVC pipeline 定义
├── params.yaml # DVC 参数文件
└── requirements.txt
2. 插件接口与实现 (策略模式)
我们首先定义所有插件必须遵守的接口。
plugins/base_extractor.py
:
# plugins/base_extractor.py
import abc
from typing import Dict, Any
class BaseFeatureExtractor(abc.ABC):
"""
所有特征提取器插件的抽象基类 (策略接口)。
每个实现类代表一种具体的特征提取策略。
"""
# name 属性必须由子类定义,用于唯一标识插件
@property
@abc.abstractmethod
def name(self) -> str:
pass
# version 属性也必须由子类定义,与 DVC 版本关联
@property
@abc.abstractmethod
def version(self) -> str:
pass
@abc.abstractmethod
def extract(self, user_input: Dict[str, Any]) -> Dict[str, Any]:
"""
核心提取方法。
:param user_input: 来自请求的原始输入数据。
:return: 一个包含提取出的特征的字典。
"""
pass
def get_identifier(self) -> str:
"""返回插件的唯一标识符,格式为 'name@version'"""
return f"{self.name}@{self.version}"
接下来是两个具体的插件实现。
plugins/user_profile_extractor.py
:
# plugins/user_profile_extractor.py
import time
from typing import Dict, Any
from .base_extractor import BaseFeatureExtractor
class UserProfileExtractor(BaseFeatureExtractor):
"""
提取用户基本画像特征,例如年龄段、注册时长。
"""
@property
def name(self) -> str:
return "user_profile"
@property
def version(self) -> str:
# 这个版本号应该由DVC或配置管理,这里为了演示先硬编码
return "1.1.0"
def extract(self, user_input: Dict[str, Any]) -> Dict[str, Any]:
# 这里的实现逻辑在真实项目中会更复杂,可能需要查询其他服务或数据库
# 为演示目的,我们进行简单计算
if "user_age" not in user_input or "registration_timestamp" not in user_input:
raise ValueError("Missing required fields 'user_age' or 'registration_timestamp'")
age = user_input["user_age"]
age_bucket = "unknown"
if 18 <= age < 25:
age_bucket = "18-24"
elif 25 <= age < 35:
age_bucket = "25-34"
elif age >= 35:
age_bucket = "35+"
days_since_registration = (time.time() - user_input["registration_timestamp"]) / (24 * 3600)
return {
"f_user_age_bucket": age_bucket,
"f_user_days_since_registration": round(days_since_registration, 2),
}
3. DVC 版本化管理
现在我们用DVC来管理这些插件和它们的参数。假设我们的特征计算依赖一些参数。
params.yaml
:
user_profile:
version: "1.1.0"
age_buckets:
- [18, 24]
- [25, 34]
- [35, 99]
item_popularity:
version: "0.9.0"
time_window_hours: 72
dvc.yaml
:
stages:
prepare_plugins:
cmd: python scripts/prepare_plugins.py
deps:
- plugins/
- scripts/prepare_plugins.py
params:
- user_profile
- item_popularity
outs:
- prepared_config.json
这个prepare_plugins.py
脚本(此处省略其代码)的作用是读取params.yaml
,并生成一个锁定版本的配置文件prepared_config.json
。这个文件会被引擎读取,确保加载的插件版本和参数与DVC追踪的一致。当plugins/
目录下的代码或params.yaml
发生变化时,运行dvc repro
就会重新生成prepared_config.json
,并将结果记录在dvc.lock
中,实现了完全的可复现性。
4. 功能装饰器与插件工厂 (装饰器与工厂模式)
我们需要一个装饰器来添加日志和性能监控。
core/decorators.py
:
# core/decorators.py
import time
import json
import logging
from functools import wraps
from typing import Callable, Any
from plugins.base_extractor import BaseFeatureExtractor
# 配置一个专用的logger,用于输出结构化日志
# 在生产环境中,应该使用更复杂的日志配置
loki_logger = logging.getLogger("loki_structured_logger")
loki_logger.setLevel(logging.INFO)
handler = logging.FileHandler("logs/engine.log")
formatter = logging.Formatter('%(message)s') # 只记录原始json消息
handler.setFormatter(formatter)
loki_logger.addHandler(handler)
def observable(func: Callable) -> Callable:
"""
装饰器,为特征提取器的 extract 方法添加可观测性。
它记录执行时间、状态,并以JSON格式输出日志。
"""
@wraps(func)
def wrapper(self: BaseFeatureExtractor, *args, **kwargs) -> Any:
start_time = time.perf_counter()
status = "success"
result = None
error_message = ""
try:
result = func(self, *args, **kwargs)
return result
except Exception as e:
status = "error"
error_message = str(e)
# 在生产环境中,应该重新抛出异常或有更精细的错误处理策略
raise
finally:
end_time = time.perf_counter()
duration_ms = (end_time - start_time) * 1000
log_payload = {
"timestamp": time.strftime('%Y-%m-%dT%H:%M:%S.%fZ', time.gmtime()),
"level": "info" if status == "success" else "error",
"message": f"Feature extraction {status}",
"plugin_name": self.name,
"plugin_version": self.version,
"plugin_identifier": self.get_identifier(),
"execution_time_ms": round(duration_ms, 3),
"status": status,
"error": error_message,
# 可以在这里添加 request_id 等上下文信息
}
loki_logger.info(json.dumps(log_payload))
return wrapper
现在是工厂类,它动态加载、实例化并用装饰器包装插件。
core/factory.py
:
# core/factory.py
import inspect
import pkgutil
from typing import Dict, Type
from plugins.base_extractor import BaseFeatureExtractor
from core.decorators import observable
class PluginFactory:
"""
插件工厂,负责发现、加载和实例化所有特征提取器插件。
"""
def __init__(self, plugin_package_path: str):
self.plugins: Dict[str, BaseFeatureExtractor] = {}
self._load_plugins(plugin_package_path)
def _load_plugins(self, package_path: str):
"""动态加载指定路径下的所有插件。"""
# 导入包以使其可被发现
import plugins
for _, name, _ in pkgutil.iter_modules(plugins.__path__):
module = __import__(f"plugins.{name}", fromlist=["*"])
for item_name in dir(module):
item = getattr(module, item_name)
if inspect.isclass(item) and issubclass(item, BaseFeatureExtractor) and item is not BaseFeatureExtractor:
instance = item()
# 关键步骤:使用装饰器包装 extract 方法
instance.extract = observable(instance.extract)
self.plugins[instance.name] = instance
print(f"Loaded plugin: {instance.get_identifier()}")
def get_plugin(self, name: str) -> BaseFeatureExtractor:
"""根据名称获取一个插件实例。"""
plugin = self.plugins.get(name)
if not plugin:
raise KeyError(f"Plugin '{name}' not found.")
return plugin
def get_all_plugins(self) -> list[BaseFeatureExtractor]:
"""获取所有已加载的插件实例。"""
return list(self.plugins.values())
5. 核心引擎与LevelDB集成
core/engine.py
:
# core/engine.py
import plyvel
import json
from typing import Dict, Any, List
from core.factory import PluginFactory
class FeatureEngine:
def __init__(self, db_path: str, plugin_package_path: str = "plugins"):
self.db_path = db_path
try:
# 创建或打开LevelDB数据库
self.db = plyvel.DB(self.db_path, create_if_missing=True)
except plyvel.Error as e:
# 在真实项目中,这里需要更健壮的错误处理和重试机制
print(f"Fatal: Could not open LevelDB at {db_path}. Error: {e}")
raise
print("Initializing plugin factory...")
self.factory = PluginFactory(plugin_package_path)
self.plugins = self.factory.get_all_plugins()
def process(self, user_id: str, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""
处理单个请求,提取并缓存所有特征。
"""
all_features = {}
for plugin in self.plugins:
plugin_id = plugin.get_identifier()
# 构造缓存key,包含用户ID和插件标识符,确保版本更新后缓存失效
cache_key = f"user:{user_id}:plugin:{plugin_id}".encode('utf-8')
cached_features = self.db.get(cache_key)
if cached_features:
features = json.loads(cached_features.decode('utf-8'))
print(f"Cache hit for {cache_key.decode('utf-8')}")
else:
print(f"Cache miss for {cache_key.decode('utf-8')}. Running extraction...")
try:
features = plugin.extract(request_data)
# 将提取的特征存入LevelDB,设置一个TTL可能更好,但plyvel本身不支持
self.db.put(cache_key, json.dumps(features).encode('utf-8'))
except Exception as e:
# 装饰器已经记录了错误,这里只打印到控制台
print(f"Error extracting features with {plugin_id}: {e}")
# 根据业务需求决定是否继续,或者返回部分特征
continue
all_features.update(features)
return all_features
def close(self):
"""安全关闭数据库连接。"""
if self.db and not self.db.closed:
self.db.close()
print("LevelDB connection closed.")
6. Loki 集成与可观测性
我们的日志已经以JSON格式写入logs/engine.log
。现在配置Promtail来采集它们。
configs/promtail-config.yaml
:
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://localhost:3100/loki/api/v1/push # Loki server address
scrape_configs:
- job_name: feature_engine
static_configs:
- targets:
- localhost
labels:
job: feature_engine
__path__: ./logs/engine.log # 监控的日志文件路径
pipeline_stages:
- json:
expressions:
level: level
message: message
plugin_name: plugin_name
plugin_version: plugin_version
status: status
- labels:
level:
plugin_name:
plugin_version:
status:
这个配置告诉Promtail:
- 监控
./logs/engine.log
文件。 - 将每一行日志解析为JSON。
- 从JSON中提取
level
,plugin_name
,plugin_version
,status
字段,并将它们作为Loki的索引标签。 - 将日志推送到本地的Loki实例。
启动Loki和Promtail后,我们就可以在Grafana中使用LogQL查询了。
- 查询某个插件的所有错误日志:
{job="feature_engine", plugin_name="user_profile", status="error"}
- 计算过去1小时内各插件的错误率:
sum by (plugin_name) (count_over_time({job="feature_engine", status="error"}[1h])) / sum by (plugin_name) (count_over_time({job="feature_engine"}[1h]))
7. 组装与运行
最后,main.py
将所有组件串联起来。
main.py
:
# main.py
from core.engine import FeatureEngine
import time
import random
def main():
DB_PATH = "./data/feature_store.db"
print("Starting Feature Extraction Engine...")
engine = FeatureEngine(db_path=DB_PATH)
# 模拟一些请求
sample_requests = [
{
"user_id": "user-123",
"data": {"user_age": 32, "registration_timestamp": time.time() - 86400 * 150}
},
{
"user_id": "user-456",
"data": {"user_age": 21, "registration_timestamp": time.time() - 86400 * 20}
},
{
"user_id": "user-123", # 再次请求,应该会命中缓存
"data": {"user_age": 32, "registration_timestamp": time.time() - 86400 * 150}
},
{
"user_id": "user-789", # 模拟一个会出错的请求
"data": {"registration_timestamp": time.time() - 86400 * 5}
}
]
try:
for req in sample_requests:
print("-" * 50)
print(f"Processing request for {req['user_id']}...")
features = engine.process(req['user_id'], req['data'])
print(f"Extracted Features for {req['user_id']}: {features}")
time.sleep(0.5)
finally:
engine.close()
if __name__ == "__main__":
main()
运行这个程序,我们会看到插件被加载,特征被提取和缓存,并且logs/engine.log
文件中充满了结构化的日志,等待被Loki采集。
局限性与未来迭代方向
我们构建的这个引擎解决了一开始提出的所有核心痛点。它通过设计模式实现了松耦合和高扩展性,利用LevelDB提供了高性能的本地缓存,借助DVC保证了特征逻辑的可复现性,并通过Loki获得了强大的可观测能力。
然而,这个方案并非银弹,它也有明确的适用边界和局限性:
- 单机限制: LevelDB是嵌入式数据库,有写锁,这意味着整个引擎是单进程、单线程写入的。它无法水平扩展。若要支持分布式或高并发写入,存储层需要替换为Redis Cluster, TiKV, 或其他分布式K/V存储。
- 插件加载机制: 目前是从本地文件系统动态加载。一个更成熟的系统可能需要一个插件注册中心,或者从S3、Git仓库等远程源安全地拉取和加载特定版本的插件包。
- 缓存策略: 当前的缓存永不过期,除非插件版本变化。在真实场景中,需要更复杂的缓存失效策略,例如基于时间的TTL(Time-To-Live)或基于事件的失效(例如,当用户的原始数据更新时)。
- DVC工作流集成: DVC的能力需要与CI/CD流程紧密集成才能发挥最大价值。例如,当一个特征插件的PR被合并时,CI流水线应自动运行
dvc repro
,并将新生成的dvc.lock
和prepared_config.json
打包到服务部署的制品中。这需要额外的工程实践来落地。