构建异构特征平台的技术选型与实现:融合Hadoop批处理、Dgraph图谱查询、Spring服务层与Phoenix实时监控


一个棘手的需求摆在了面前:我们需要构建一个特征平台,不仅要能处理每日T+1生成的数百TB用户行为日志,还要能以低于50ms的延迟响应复杂的、具备多跳关系的用户群体特征查询。例如,“查询最近7天内,与购买过A商品的用户有过至少两次互动的用户的综合风险评分”。这种需求直接排除了传统的技术栈。

传统的键值存储(如Redis、Cassandra)无法有效处理图关系查询,而单纯的关系型数据库在面对海量数据和复杂关系时,查询性能会急剧下降。这迫使我们必须进行一次跨技术栈的深度整合,探索一套非典型的解决方案。

定义问题:双模态特征存储的挑战

技术需求可以拆解为四个核心部分:

  1. 批量计算层 (Batch Layer): 必须有能力稳定、高效地处理每日新增的TB级原始日志,执行大规模的ETL和特征工程计算。这是典型的批处理场景。
  2. 图谱服务层 (Graph Serving Layer): 必须能存储数十亿节点和数百亿边的复杂关系网络,并提供低延迟的多跳图遍历查询能力。这是整个架构的技术核心难点。
  3. 在线服务层 (Online API Layer): 需要一个稳定、高并发的API网关,供上游的机器学习模型、风控引擎等服务调用。接口协议需要足够灵活,以适应多变的特征查询需求。
  4. 实时监控层 (Real-time Monitoring Layer): 需要一个实时的运维驾驶舱,用以监控数据导入状态、特征数据分布的漂移、服务API的P99延迟等关键指标。这要求一个高效的实时消息推送机制。

架构方案权衡:从“常规”到“异构”

方案A: 业界相对常规的Lambda架构变体

  • 批量计算层: Hadoop MapReduce / Spark on YARN
  • 服务层数据存储: Elasticsearch / Cassandra
  • 在线服务层: Spring Boot
  • 实时监控层: Prometheus + Grafana + 一个基于REST轮询的前端

这种架构的优势在于技术成熟度高,社区庞大,招聘和维护相对容易。Hadoop/Spark处理批处理任务是行业标准。Spring Boot构建REST API也是驾轻就熟。

但它的致命缺陷在于服务层。 Elasticsearch虽然能做一些关联查询,但本质是搜索引擎,对于深度的、递归式的图遍历(如朋友的朋友)性能极差且查询语法复杂。Cassandra作为列式存储,模型设计需要围绕查询模式(Query-Driven Modeling),一旦查询模式变化,可能需要重构数据表,无法应对我们对图谱探索的灵活性要求。使用它来模拟图,本质上是应用层做了大量的JOIN操作,延迟根本无法满足要求。

方案B: 拥抱异构技术栈的特化方案

  • 批量计算层: Hadoop MapReduce (利用现有集群和成熟的ETL脚本)
  • 图谱服务层: Dgraph (一个原生的分布式图数据库)
  • 在线服务层: Spring Boot (集成GraphQL,与Dgraph天然契合)
  • 实时监控层: Phoenix Framework (利用其强大的WebSocket和并发能力构建实时Dashboard)

这个方案非常大胆。它引入了Dgraph和Phoenix这两个在Java技术栈中相对小众的技术。

优势:

  1. 专业化: Dgraph就是为高性能图查询而生的。它底层使用专门的图存储结构,处理多跳查询的性能远超任何模拟方案。其原生支持GraphQL,使得与前端和API层的集成异常简单。
  2. 实时性: Phoenix基于Erlang VM (BEAM),其处理海量并发长连接(如WebSocket)的能力是其核心优势。用它来做监控驾驶舱,可以轻松实现服务器端推送,避免了前端低效的轮询,数据更新可以做到毫秒级。

劣势:

  1. 技术栈复杂性: 团队需要同时维护Hadoop (Java/Scala), Dgraph (Go), Spring (Java/Kotlin), Phoenix (Elixir) 四个异构的技术栈。这对团队技能、运维监控、CI/CD都提出了极高的挑战。
  2. 社区与生态: Dgraph和Phoenix的社区规模远小于方案A中的组件,遇到问题的解决成本可能会更高。

最终决策:
考虑到“低延迟的复杂关系查询”是本次项目的核心业务价值,我们不能在这个关键点上妥协。方案A的短板是致命的。因此,我们选择承担方案B的技术复杂性,以换取在核心功能上的压倒性优势。在真实项目中,这种基于核心需求的“赌注”是架构决策的关键。

架构与数据流设计

我们将整个系统的数据流设计如下,通过一个Mermaid图来清晰展示:

graph TD
    subgraph Hadoop Cluster
        A[Raw Logs on HDFS] --> B{MapReduce Job};
        B --> C[Feature Data in Parquet on HDFS];
    end

    subgraph Data Ingestion Service
        D[Spring Batch Application] -- Reads --> C;
        D -- Formats & Bulk Loads --> E(Dgraph Cluster);
        D -- Sends Metrics --> F[Message Queue - RabbitMQ];
    end

    subgraph Online Serving
        G[ML/Risk Engines] -- GraphQL Query --> H[Spring Boot API Service];
        H -- Dgraph GQL --> E;
    end

    subgraph Real-time Monitoring
        I[Elixir Phoenix App] -- Consumes --> F;
        I -- WebSocket Push --> J(Monitoring Dashboard);
    end

    E -- Health & Query Metrics --> F;

数据流转过程:

  1. 每日的原始日志存储在HDFS上。
  2. 一个定时调度的Hadoop MapReduce作业启动,对原始日志进行清洗、转换,并计算出用户与实体间的关系,最终将结构化的特征数据以Parquet格式输出到HDFS的另一个目录。
  3. 一个独立的Spring Batch应用负责数据导入。它会读取HDFS上的Parquet文件,将其转换为Dgraph的RDF N-Quad格式,然后通过Dgraph的Bulk Loader或gRPC接口高效地导入数据。
  4. 在导入过程中或完成后,该应用会向RabbitMQ发送关键指标,如导入记录数、耗时、数据分布摘要等。
  5. 上游应用通过向Spring Boot API服务发送GraphQL查询来获取特征。
  6. Spring Boot服务将GraphQL查询转换为Dgraph的原生查询,请求Dgraph集群并返回结果。
  7. Phoenix应用中有一个消费者进程,持续监听RabbitMQ。一旦收到新的监控指标,它会通过Phoenix Channels将数据实时推送到前端监控Dashboard,无需前端刷新或轮询。

核心实现细节与代码

1. Hadoop MapReduce 特征提取

这里的核心是将非结构化的日志转化为结构化的图关系。假设日志格式为 timestamp, userId, action, targetId

// Simplified Mapper to extract user-target relationships
public class FeatureExtractorMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] parts = line.split(",");
        if (parts.length == 4) {
            String userId = parts[1];
            String action = parts[2];
            String targetId = parts[3];

            // We want to model the interaction as an edge property
            // Key: <user_id> <target_id>, Value: 1 (for counting interactions)
            if ("interact".equals(action)) {
                word.set(userId + "\t" + targetId);
                context.write(word, one);
            }
        }
    }
}

// Simplified Reducer to aggregate interactions
public class InteractionCountReducer extends Reducer<Text, IntWritable, Text, NullWritable> {

    private Text result = new Text();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }

        // Output format ready for Parquet conversion: user_id, target_id, interaction_count
        String[] ids = key.toString().split("\t");
        String output = String.format("{\"user_id\": \"%s\", \"target_id\": \"%s\", \"interaction_count\": %d}", ids[0], ids[1], sum);
        result.set(output);
        context.write(result, NullWritable.get());
    }
}
  • 设计考量: 在真实项目中,我们会使用Avro或Protobuf作为中间格式,并用Spark替代MapReduce以获得更好的性能和更易用的API。但这里的核心思想是相同的:将日志数据转换为描述节点和边的结构化数据。输出使用JSON Lines格式只是为了演示,生产中会直接写入Parquet。

2. Dgraph Schema 与数据导入

Dgraph的schema定义了图谱的结构。我们使用GraphQL+- (Dgraph的查询语言) 的type来定义。

schema.graphql

type User {
    user_id: String! @id
    risk_score: Float
    interacts_with: [User] @hasInverse(field: interacts_with)
    purchased: [Product]
}

type Product {
    product_id: String! @id
    name: String
    category: String
}
  • 注意: @id指令告诉Dgraph user_idproduct_id是外部唯一标识符,可以用来进行upsert操作。@hasInverse创建了一个双向边,这对于很多图查询至关重要。

数据导入 (Spring Batch ItemWriter)

// In a Spring Batch Job Configuration
@Bean
public ItemWriter<FeatureRecord> dgraphItemWriter(DgraphClient dgraphClient) {
    return items -> {
        // A common mistake is to send mutations one by one. This is extremely slow.
        // Always batch mutations.
        try (Transaction txn = dgraphClient.newTransaction()) {
            Mutation.Builder muBuilder = Mutation.newBuilder();
            List<NQuad> nQuads = new ArrayList<>();

            for (FeatureRecord item : items) {
                // Example: <_:user_uid> <dgraph.type> "User" .
                nQuads.add(NQuad.newBuilder()
                        .setSubject(String.format("_:%s", item.getUserId()))
                        .setPredicate("dgraph.type")
                        .setObjectValue(StringValue.newBuilder().setVal("User").build())
                        .build());
                // <_:user_uid> <user_id> "user_id_value" .
                nQuads.add(NQuad.newBuilder()
                        .setSubject(String.format("_:%s", item.getUserId()))
                        .setPredicate("user_id")
                        .setObjectValue(StringValue.newBuilder().setVal(item.getUserId()).build())
                        .build());

                // Create the relationship
                // <_:user_uid> <interacts_with> <_:target_uid> .
                nQuads.add(NQuad.newBuilder()
                        .setSubject(String.format("_:%s", item.getUserId()))
                        .setPredicate("interacts_with")
                        .setObjectId(String.format("_:%s", item.getTargetId()))
                        .build());

                // Add edge properties. This is a bit tricky in Dgraph.
                // We model it using facets.
                Facet facet = Facet.newBuilder()
                        .setKey("interaction_count")
                        .setValue(String.valueOf(item.getInteractionCount()).getBytes())
                        .setValType(Facet.ValType.INT)
                        .build();
                
                // The NQuad for the relationship needs to carry the facet.
                nQuads.add(NQuad.newBuilder()
                        .setSubject(String.format("_:%s", item.getUserId()))
                        .setPredicate("interacts_with")
                        .setObjectId(String.format("_:%s", item.getTargetId()))
                        .addFacets(facet) // Attach facet here
                        .build());
                
            }

            muBuilder.addAllSet(nQuads);
            txn.mutate(muBuilder.build());
            txn.commit();
        } catch (Exception e) {
            // In production, add robust error handling: retry logic, dead-letter queue, etc.
            log.error("Failed to write batch to Dgraph", e);
            throw new RuntimeException(e);
        }
    };
}
  • 这里的坑在于: Dgraph的性能很大程度上取决于你如何批量提交数据。逐条写入会导致大量的网络开销和事务开销。正确的做法是攒够一个大批次(比如1000条记录),在一个事务中通过Mutation一次性提交。另外,对于边上的属性(如交互次数),需要使用Facets来实现,这是一个常见的Dgraph建模技巧。

3. Spring Boot + GraphQL 服务层

我们使用 spring-boot-starter-graphql 来快速构建GraphQL端点。

FeatureController.java

@Controller
public class FeatureController {

    private final DgraphService dgraphService;

    // Standard constructor injection
    public FeatureController(DgraphService dgraphService) {
        this.dgraphService = dgraphService;
    }

    // This method maps to a GraphQL query named "complexFeatureQuery"
    @QueryMapping
    public FeatureResult complexFeatureQuery(@Argument String targetProductId, @Argument int minInteractions) {
        return dgraphService.findUsersWithComplexInteraction(targetProductId, minInteractions);
    }
}

DgraphService.java

@Service
@Slf4j
public class DgraphService {

    private final DgraphAsyncClient dgraphClient;

    public DgraphService(DgraphAsyncClient dgraphClient) {
        this.dgraphClient = dgraphClient;
    }

    // Unit test idea: Mock DgraphAsyncClient and verify the generated query string is correct.
    // Also test parsing logic for both success and error responses.
    public FeatureResult findUsersWithComplexInteraction(String targetProductId, int minInteractions) {
        // This query finds users who bought a product, then finds other users
        // who interacted with them, and filters those interactions.
        String query = String.format("""
            query GetComplexFeatures($productId: string, $minInteractions: int) {
              
              # Step 1: Find users who bought the target product
              var(func: eq(product_id, $productId)) {
                ~purchased {
                  buyers as uid
                }
              }
              
              # Step 2: From the buyers, find users they interacted with
              var(func: uid(buyers)) {
                interacts_with @filter(gt(interaction_count, $minInteractions)) {
                  interacting_users as uid
                }
              }

              # Step 3: Fetch details of the final user set
              final_users(func: uid(interacting_users)) {
                user_id
                risk_score
              }
            }
            """, targetProductId, minInteractions);
        
        Map<String, String> vars = new HashMap<>();
        vars.put("$productId", targetProductId);
        vars.put("$minInteractions", String.valueOf(minInteractions));

        try {
            Response response = dgraphClient.newReadOnlyTransaction().queryWithVars(query, vars).get();
            // In a real application, you'd use a proper JSON parsing library (e.g., Jackson)
            // to map this to your domain objects.
            JsonObject json = JsonParser.parseString(new String(response.getJson().toByteArray())).getAsJsonObject();
            
            // Add proper error handling for Dgraph errors, timeouts etc.
            // ... parsing logic to build FeatureResult object ...

            return new FeatureResult(/*...parsed data...*/);
        } catch (InterruptedException | ExecutionException e) {
            log.error("Dgraph query failed for product {}", targetProductId, e);
            Thread.currentThread().interrupt(); // Preserve interrupted status
            throw new DgraphQueryException("Failed to query Dgraph", e);
        }
    }
}
  • 设计考量: 使用Dgraph的var块是构建复杂查询的关键。它允许我们将一个查询的结果集作为下一个查询块的输入,形成一个管道。这比在客户端进行多次查询然后合并结果要高效得多。日志和异常处理至关重要,必须明确区分客户端错误和服务器端错误。

4. Phoenix 实时监控

Phoenix的核心是Channel,它提供了基于WebSocket的双向通信。

assets/js/socket.js (前端部分)

import {Socket} from "phoenix"

let socket = new Socket("/socket", {params: {token: window.userToken}})
socket.connect()

let channel = socket.channel("monitoring:lobby", {})
channel.on("new_metrics", payload => {
  // payload will contain {batch_id: "...", records_ingested: 10000, duration_ms: 5000}
  console.log("Received new ingestion metrics:", payload)
  // Here, you would update your charts (e.g., using D3.js, Chart.js)
  updateDashboard(payload);
})

channel.join()
  .receive("ok", resp => { console.log("Joined monitoring channel successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket

lib/my_app_web/channels/monitoring_channel.ex (后端部分)

defmodule MyAppWeb.MonitoringChannel do
  use MyAppWeb, :channel

  # Called when a client joins the channel with a topic
  def join("monitoring:lobby", _payload, socket) do
    # Here you could add authentication/authorization logic
    {:ok, socket}
  end

  # This function is not called by clients directly.
  # It's an internal API for our application to broadcast messages.
  def handle_info({:new_metrics, metrics}, socket) do
    # Push the message to all connected clients on this topic
    push(socket, "new_metrics", metrics)
    {:noreply, socket}
  end

  # We can also handle messages from clients, but it's not needed for this use case.
  def handle_in("client_event", payload, socket) do
    # ...
    {:noreply, socket}
  end
end

RabbitMQ 消费者 lib/my_app/metrics_consumer.ex

defmodule MyApp.MetricsConsumer do
  use GenServer
  alias MyAppWeb.Endpoint

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    # Setup connection to RabbitMQ
    {:ok, conn} = AMQP.Connection.open("amqp://guest:guest@localhost")
    {:ok, chan} = AMQP.Channel.open(conn)
    
    # Declare queue and consumer
    AMQP.Queue.declare(chan, "dgraph_ingest_metrics")
    AMQP.Basic.consume(chan, "dgraph_ingest_metrics", nil, no_ack: true)
    
    {:ok, %{channel: chan}}
  end

  # This callback handles incoming messages from RabbitMQ
  def handle_info({:basic_deliver, payload, _meta}, state) do
    # Decode the JSON payload
    metrics = Jason.decode!(payload)
    
    # Broadcast to the Phoenix channel.
    # The topic "monitoring:lobby" must match the one clients subscribe to.
    Endpoint.broadcast("monitoring:lobby", "new_metrics", metrics)
    
    {:noreply, state}
  end
end
  • 这里的关键: 我们没有将Phoenix直接暴露给数据导入服务。这是一个重要的架构解耦。Spring Batch服务只负责将消息推送到稳定的消息队列(RabbitMQ)。Phoenix应用作为消费者,从队列中获取消息,再通过Channel广播出去。这使得两个系统可以独立扩展和失败。如果Phoenix应用宕机,消息会暂存在RabbitMQ中,不会丢失。

架构的局限性与未来展望

这套异构架构虽然在功能上满足了我们极为苛刻的需求,但也引入了显而易见的挑战。首先是运维成本。维护四个完全不同的技术栈,需要一个技能全面的DevOps团队,并为每个组件建立独立的、深入的可观测性体系。其次,跨系统的数据一致性和schema演进是一个长期痛点,任何上游数据格式的变更都可能需要协调Hadoop、Spring和Dgraph三方的代码修改。

未来的一个优化方向可能是统一计算引擎。例如,用Flink替代Hadoop批处理和部分Spring Batch的逻辑,实现流批一体,从而减少一个重量级组件。另一个方向是探索Dgraph的订阅(Subscription)功能,如果其足够成熟,或许可以在某些场景下替代Phoenix+RabbitMQ的组合,以简化实时推送链路。但就目前而言,这套经过仔细权衡的组合拳,是解决特定领域复杂问题的有效实践。


  目录