使用Haskell DSL统一批处理与时序特征源的混合特征存储架构决策


特征工程的复杂性不在于单一特征的计算,而在于保证在线服务与离线训练之间特征逻辑的绝对一致性,尤其是在一个同时依赖于海量批处理数据和高频时序数据的混合环境中。我们面临的挑战是:数据科学家用Python或SQL快速迭代特征逻辑,但这些定义往往是松散的、缺乏类型约束的,并且散落在不同的代码仓库和调度脚本中。当这些逻辑需要被工程团队用高性能语言(如Java/Go)为在线服务重新实现时,细微的偏差就会导致灾难性的训练-服务偏斜(Training-Serving Skew)。

问题的核心在于特征定义缺乏一个单一、可验证、强类型的“事实源头”(Source of Truth)。任何基于配置(YAML/JSON)或脚本语言的方案,本质上都是在推迟错误的发生,从开发期推迟到运行期。

方案A:基于Python与配置文件的行业主流方案

这是最常见的路径。通常,它涉及一个中心化的Python库,数据科学家使用它来定义特征。这些定义被序列化成JSON或YAML,然后由不同的执行引擎(如Spark、Flink、在线服务)来解析和执行。

优势分析

  1. 低上手门槛:团队中的绝大多数人熟悉Python,可以快速上手开发。生态系统成熟,有大量的库可以用于数据处理(Pandas, NumPy)。
  2. 灵活性高:脚本语言的动态性允许快速实验和迭代,无需漫长的编译周期。

劣势分析与生产事故复盘

这套方案的脆弱性在生产环境中暴露无遗。

  1. 类型安全缺失:一个典型的事故是,离线Spark任务处理的某个上游数据源字段从Long变成了Double。Python的动态类型对此毫无察觉,特征计算脚本正常运行,但生成的数据类型已经改变。在线服务的Java代码期望的是一个long,在接收到包含小数点的特征值时,要么直接抛出NumberFormatException,要么在强制类型转换后丢失精度,导致模型预测出现严重偏差。这个问题直到线上流量出现异常才被发现。
  2. 隐式逻辑不一致:SQL方言的差异是另一个陷阱。数据科学家在Hive上验证了一个窗口函数,但在线服务为了性能,需要将同样的逻辑在TimescaleDB中用PostgreSQL的窗口函数实现。两者对于NULL值的处理、边界条件的定义存在细微差别,导致在线和离线计算结果在某些边缘案例上不一致。
  3. 重构与维护的噩梦:当特征数量达到数千个,特征间的依赖关系错综复杂时,基于字符串的特征引用和配置文件变得极难维护。想要安全地修改一个被多个下游特征依赖的基础特征,需要进行全局的文本搜索和大量的人工回归测试,效率低下且风险极高。

这个方案的本质问题是它将“特征定义”视为“配置”或“脚本”,而我们认为,特征定义应该被视为“代码”,并且是系统中需要最严格保证正确性的核心代码。

方案B:基于Haskell DSL的强类型特征定义与代码生成

我们的替代方案是设计一个领域特定语言(DSL)来描述特征,并选择Haskell作为实现该DSL的宿主语言。这个决策的核心驱动力是Haskell提供的顶级类型系统和纯函数特性。

这个方案的核心思想是:

  1. 单一事实源头:所有特征定义都用Haskell DSL编写,存储在专门的代码仓库中。
  2. 编译时验证:利用Haskell的编译器(GHC)在编译期就验证特征定义的类型安全、依赖关系、逻辑完整性。例如,一个期望输入为Int的转换函数无法被应用到一个源为Text的特征上,这在编译时就会失败。
  3. 多目标代码生成:Haskell程序本身不直接计算特征,而是作为一个“特征编译器”。它解析DSL定义的特征,并为不同的目标平台(Haskell -> Target Code)生成相应的原生代码或查询语句,如Spark的Scala代码、Spark SQL查询、TimescaleDB的SQL查询,甚至在线服务所需的Java/Go代码片段。

优势分析

  1. 根除类型不一致:特征的数据类型、来源、转换逻辑都被编码在Haskell的类型系统中。任何不匹配都会导致编译失败,问题在开发阶段就被彻底解决。
  2. 保证逻辑等价性:由于所有平台的执行逻辑都由同一个Haskell程序生成,我们可以在生成器层面保证其语义等价性,从根本上消除了因人工翻译逻辑而引入的偏差。
  3. 重构的安全保障:当需要修改一个特征时,Haskell编译器会精确地指出所有受到影响的下游依赖。重构不再是盲人摸象,而是由编译器精确引导的外科手术。

劣势分析

  1. 技术栈引入成本:在团队中引入Haskell需要投入显著的培训成本,其陡峭的学习曲线可能会在短期内影响开发效率。
  2. 生态系统与集成:与Hadoop、Spark等生态的直接集成不如Python或JVM语言成熟,需要编写更多的胶水代码(例如,通过FFI调用C库,或者通过HTTP服务进行通信)。
  3. 表现力边界:DSL的设计需要预见未来可能出现的特征计算模式。对于某些极其复杂或非结构化的自定义转换(UDF),在强类型的DSL中表达它们可能会比在Python中直接写一个函数更为复杂。

最终架构选择与理由

尽管引入Haskell存在挑战,但从长期来看,为了保证一个数千个特征、上百个模型、每日PB级数据处理系统的稳定性和可维护性,前期的投入是值得的。我们选择方案B。其核心价值在于,它将特征工程的关注点从“如何修复运行时错误”转变为“如何设计一个不会产生运行时错误的系统”。这是一个架构层面的根本性转变。

核心实现概览

我们设计的混合特征存储系统的数据流和组件交互如下:

graph TD
    subgraph Offline Batch Processing
        HDFS[Hadoop HDFS] --> Spark
    end

    subgraph Real-time/Near-real-time
        Kafka[Kafka Topics] --> Flink[Flink Job] --> TSDB[(TimescaleDB)]
    end

    subgraph Feature Definition & Orchestration
        HaskellDSL[Haskell Feature DSL Repo] -- compile --> FeatureCompiler
        FeatureCompiler -- generates --> SparkSQL[Spark SQL Query]
        FeatureCompiler -- generates --> TimescaleSQL[TimescaleDB SQL Query]
        FeatureCompiler -- generates --> FeatureMetadata[Feature Metadata JSON]
    end

    subgraph Feature Storage & Discovery
        Spark -- writes --> OfflineStore[Offline Parquet Store]
        TSDB -- serves --> OnlineStore{Online Store}
        OfflineStore -- sync --> OnlineStore
        FeatureCompiler -- writes --> ES[Elasticsearch Index]
    end

    subgraph Serving
        MLService[ML Model Serving API] -- query --> OnlineStore
        MLService -- search --> ES
    end

    Airflow[Airflow DAG] -- triggers --> Spark[Spark Job]
    Spark -- uses --> SparkSQL
    Airflow -- triggers --> TimescaleSQL
  1. Haskell DSL: 定义特征的单一事实源头。
  2. Feature Compiler: 一个Haskell可执行程序,CI/CD流水线的一部分。当DSL代码合并到主分支时,它会自动运行,生成SQL查询和元数据。
  3. Hadoop/Spark: 负责处理存储在HDFS上的海量历史数据,执行由Haskell生成的Spark SQL,计算批处理特征,结果存储为Parquet格式。
  4. TimescaleDB: 存储需要进行时间窗口分析的事件流数据,如用户点击、交易记录等。它执行由Haskell生成的窗口函数SQL,为在线查询提供近实时的时序特征。
  5. Elasticsearch: 存储由Feature Compiler生成的元数据。数据科学家和ML工程师可以通过它来搜索、发现和理解已有的特征,包括特征的定义、来源、数据类型、版本和负责人。
  6. Online Store: 这是一个逻辑概念,物理上可能由TimescaleDB和另一个键值存储(如Redis)组合而成,用于服务线上模型的实时特征查询。

Haskell DSL 核心代码实现

以下是DSL设计的核心数据类型和代码生成器的简化示例。在真实项目中,这些定义会复杂得多,但核心思想是一致的。

-- src/FeatureStore/DSL.hs

{-# LANGUAGE OverloadedStrings #-}

module FeatureStore.DSL where

import Data.Text (Text)
import qualified Data.Text as T

-- 定义数据源的类型
-- Algebraic Data Type (ADT) 能够精确地建模我们的数据来源
data FeatureSource
  = HadoopTable { hTableName :: Text, hDateField :: Text }
  | TimescaleHypertable { tsTableName :: Text, tsTimestampField :: Text, tsEntityIdField :: Text }
  deriving (Show, Eq)

-- 定义支持的特征数据类型
data FeatureType = FInt | FDouble | FString | FTimestamp
  deriving (Show, Eq)

-- 定义特征转换操作
-- 这里的每一个构造函数都代表一种特征计算逻辑
data Transformation
  = Identity Text -- 直接从源字段获取
  | CountDistinct Text -- 对某个字段计算去重计数
  | Sum Text -- 对某个字段求和
  | TimeWindowed {
      aggFunc :: AggFunction,
      aggField :: Text,
      windowSize :: Int, -- in days
      slideInterval :: Int -- in days. For simplicity, assume daily sliding.
    }
  deriving (Show, Eq)

data AggFunction = AggSum | AggCount | AggAvg
  deriving (Show, Eq)

-- 核心的特征定义结构
data FeatureDef = FeatureDef {
    featureName :: Text,
    featureType :: FeatureType,
    featureSource :: FeatureSource,
    transformation :: Transformation
} deriving (Show, Eq)

-- 代码生成器接口
-- 这是一个类型类,任何可以被生成的目标平台都需要实现它
class CodeGenerator target where
  generate :: FeatureDef -> target

-- 为Spark SQL实现代码生成器
newtype SparkSQL = SparkSQL { getQuery :: Text } deriving (Show)

instance CodeGenerator SparkSQL where
  generate featureDef = SparkSQL $ case featureSource featureDef of
    HadoopTable table dateField ->
      let
        transSQL = case transformation featureDef of
          Identity field -> field
          CountDistinct field -> T.concat ["COUNT(DISTINCT ", field, ")"]
          Sum field -> T.concat ["SUM(", field, ")"]
          -- 批处理中的时间窗口通常通过分区实现
          -- 这里是一个简化的例子,实际情况会更复杂
          TimeWindowed aggFun field days _ ->
            let
              aggStr = case aggFun of
                AggSum -> "SUM"
                AggCount -> "COUNT"
                AggAvg -> "AVG"
            in
              T.concat [ aggStr, "(", field, ") OVER (PARTITION BY entity_id ORDER BY ", dateField, " ROWS BETWEEN ", T.pack (show (days - 1)), " PRECEDING AND CURRENT ROW)"]
      in
        T.unlines [
          "SELECT",
          "  entity_id,",
          "  " <> transSQL <> " AS " <> featureName featureDef <> ",",
          "  " <> dateField,
          "FROM " <> table,
          "-- Production code would need robust date filtering and partitioning logic here",
          "WHERE " <> dateField <> " >= date_sub(current_date, 30) -- Example filter"
        ]
    _ -> "Error: Incompatible source for Spark Batch processing"

-- 为TimescaleDB SQL实现代码生成器
newtype TimescaleSQL = TimescaleSQL { getTsQuery :: Text } deriving (Show)

instance CodeGenerator TimescaleSQL where
  generate featureDef = TimescaleSQL $ case featureSource featureDef of
    TimescaleHypertable table tsField entityId ->
      let
        transSQL = case transformation featureDef of
          -- 简化示例,在线查询通常是针对单个实体
          TimeWindowed aggFun field days _ ->
            let
              aggStr = case aggFun of
                AggSum -> "SUM"
                AggCount -> "COUNT"
                AggAvg -> "AVG"
              windowInterval = T.pack (show days) <> " days"
            in
              T.unlines [
                "SELECT",
                "  " <> entityId <> ",",
                "  " <> aggStr <> "(" <> field <> ") AS " <> featureName featureDef,
                "FROM " <> table,
                "WHERE " <> entityId <> " = ? AND", -- '?' is a placeholder for the entity ID
                "  " <> tsField <> " >= NOW() - INTERVAL '" <> windowInterval <> "'",
                "GROUP BY " <> entityId
              ]
          _ -> "Error: Transformation not supported for TimescaleDB online query"
      in
        transSQL
    _ -> "Error: Incompatible source for TimescaleDB processing"

特征定义示例

数据科学家只需要在专门的Haskell文件中添加如下定义:

-- src/Features/UserFeatures.hs

module Features.UserFeatures where

import FeatureStore.DSL

-- 特征1: 用户总订单数 (批处理)
userTotalOrders :: FeatureDef
userTotalOrders = FeatureDef {
    featureName = "user_total_orders",
    featureType = FInt,
    featureSource = HadoopTable "prod.orders" "order_date",
    transformation = CountDistinct "order_id"
}

-- 特征2: 用户近7天交易额 (时序)
user7dTransactionSum :: FeatureDef
user7dTransactionSum = FeatureDef {
    featureName = "user_7d_gmv",
    featureType = FDouble,
    featureSource = TimescaleHypertable "events.transactions" "event_timestamp" "user_id",
    transformation = TimeWindowed {
        aggFunc = AggSum,
        aggField = "amount",
        windowSize = 7,
        slideInterval = 1
    }
}

当CI运行时,FeatureCompiler会读取这些定义并生成:

  1. 一个给Spark执行的SQL文件,用于计算user_total_orders
  2. 一个给在线服务使用的SQL模板,用于查询user_7d_gmv
  3. 两个JSON文档,描述这两个特征的元数据,并将其索引到Elasticsearch中。

Elasticsearch元数据存储

每个特征生成的元数据文档结构如下,被索引到Elasticsearch中,以支持全文搜索和结构化查询。

{
  "featureName": "user_7d_gmv",
  "version": "v1.2.0",
  "author": "data_science_team_a",
  "description": "The total transaction amount for a user in the last 7 days.",
  "featureType": "FDouble",
  "source": {
    "type": "TimescaleHypertable",
    "tableName": "events.transactions",
    "timestampField": "event_timestamp",
    "entityIdField": "user_id"
  },
  "transformation": {
    "type": "TimeWindowed",
    "aggFunc": "AggSum",
    "aggField": "amount",
    "windowSizeDays": 7
  },
  "createdAt": "2023-10-27T10:15:21Z",
  "tags": ["user_profile", "risk_control", "realtime"]
}

一个ML工程师可以通过简单的查询找到所有与用户交易相关的实时特征:GET /features/_search?q=tags:realtime AND description:*transaction*

架构的扩展性与局限性

此架构的主要扩展点在于CodeGenerator类型类。当我们需要支持新的计算引擎,例如Flink或一个内部的C++服务时,我们只需要为它实现一个新的CodeGenerator实例。只要原始的Haskell DSL定义不变,我们就能保证新目标平台上的逻辑与其他平台保持一致。

然而,该方案的局限性也十分明显。首先,DSL的表现力是有限的。对于需要复杂Python UDF的特征(例如,调用一个NLP模型进行文本处理),目前的DSL设计无法优雅地表达。一种可能的演进方向是在DSL中引入一个ExternalUDF构造函数,它只定义UDF的输入输出类型,具体的实现逻辑则由各平台分别维护,但这在一定程度上牺牲了端到端的逻辑一致性保证。其次,对Haskell编译环境和构建工具链(如Cabal或Stack)的维护,给DevOps团队带来了额外的负担。最后,该方案的成功高度依赖于跨团队的文化认同和纪律,如果某个团队为了“快速上线”而绕过DSL,直接在下游系统中手写特征逻辑,整个体系的根基就会动摇。


  目录