一个棘手的需求摆在了面前:我们需要构建一个特征平台,不仅要能处理每日T+1生成的数百TB用户行为日志,还要能以低于50ms的延迟响应复杂的、具备多跳关系的用户群体特征查询。例如,“查询最近7天内,与购买过A商品的用户有过至少两次互动的用户的综合风险评分”。这种需求直接排除了传统的技术栈。
传统的键值存储(如Redis、Cassandra)无法有效处理图关系查询,而单纯的关系型数据库在面对海量数据和复杂关系时,查询性能会急剧下降。这迫使我们必须进行一次跨技术栈的深度整合,探索一套非典型的解决方案。
定义问题:双模态特征存储的挑战
技术需求可以拆解为四个核心部分:
- 批量计算层 (Batch Layer): 必须有能力稳定、高效地处理每日新增的TB级原始日志,执行大规模的ETL和特征工程计算。这是典型的批处理场景。
- 图谱服务层 (Graph Serving Layer): 必须能存储数十亿节点和数百亿边的复杂关系网络,并提供低延迟的多跳图遍历查询能力。这是整个架构的技术核心难点。
- 在线服务层 (Online API Layer): 需要一个稳定、高并发的API网关,供上游的机器学习模型、风控引擎等服务调用。接口协议需要足够灵活,以适应多变的特征查询需求。
- 实时监控层 (Real-time Monitoring Layer): 需要一个实时的运维驾驶舱,用以监控数据导入状态、特征数据分布的漂移、服务API的P99延迟等关键指标。这要求一个高效的实时消息推送机制。
架构方案权衡:从“常规”到“异构”
方案A: 业界相对常规的Lambda架构变体
- 批量计算层: Hadoop MapReduce / Spark on YARN
- 服务层数据存储: Elasticsearch / Cassandra
- 在线服务层: Spring Boot
- 实时监控层: Prometheus + Grafana + 一个基于REST轮询的前端
这种架构的优势在于技术成熟度高,社区庞大,招聘和维护相对容易。Hadoop/Spark处理批处理任务是行业标准。Spring Boot构建REST API也是驾轻就熟。
但它的致命缺陷在于服务层。 Elasticsearch虽然能做一些关联查询,但本质是搜索引擎,对于深度的、递归式的图遍历(如朋友的朋友)性能极差且查询语法复杂。Cassandra作为列式存储,模型设计需要围绕查询模式(Query-Driven Modeling),一旦查询模式变化,可能需要重构数据表,无法应对我们对图谱探索的灵活性要求。使用它来模拟图,本质上是应用层做了大量的JOIN操作,延迟根本无法满足要求。
方案B: 拥抱异构技术栈的特化方案
- 批量计算层: Hadoop MapReduce (利用现有集群和成熟的ETL脚本)
- 图谱服务层: Dgraph (一个原生的分布式图数据库)
- 在线服务层: Spring Boot (集成GraphQL,与Dgraph天然契合)
- 实时监控层: Phoenix Framework (利用其强大的WebSocket和并发能力构建实时Dashboard)
这个方案非常大胆。它引入了Dgraph和Phoenix这两个在Java技术栈中相对小众的技术。
优势:
- 专业化: Dgraph就是为高性能图查询而生的。它底层使用专门的图存储结构,处理多跳查询的性能远超任何模拟方案。其原生支持GraphQL,使得与前端和API层的集成异常简单。
- 实时性: Phoenix基于Erlang VM (BEAM),其处理海量并发长连接(如WebSocket)的能力是其核心优势。用它来做监控驾驶舱,可以轻松实现服务器端推送,避免了前端低效的轮询,数据更新可以做到毫秒级。
劣势:
- 技术栈复杂性: 团队需要同时维护Hadoop (Java/Scala), Dgraph (Go), Spring (Java/Kotlin), Phoenix (Elixir) 四个异构的技术栈。这对团队技能、运维监控、CI/CD都提出了极高的挑战。
- 社区与生态: Dgraph和Phoenix的社区规模远小于方案A中的组件,遇到问题的解决成本可能会更高。
最终决策:
考虑到“低延迟的复杂关系查询”是本次项目的核心业务价值,我们不能在这个关键点上妥协。方案A的短板是致命的。因此,我们选择承担方案B的技术复杂性,以换取在核心功能上的压倒性优势。在真实项目中,这种基于核心需求的“赌注”是架构决策的关键。
架构与数据流设计
我们将整个系统的数据流设计如下,通过一个Mermaid图来清晰展示:
graph TD subgraph Hadoop Cluster A[Raw Logs on HDFS] --> B{MapReduce Job}; B --> C[Feature Data in Parquet on HDFS]; end subgraph Data Ingestion Service D[Spring Batch Application] -- Reads --> C; D -- Formats & Bulk Loads --> E(Dgraph Cluster); D -- Sends Metrics --> F[Message Queue - RabbitMQ]; end subgraph Online Serving G[ML/Risk Engines] -- GraphQL Query --> H[Spring Boot API Service]; H -- Dgraph GQL --> E; end subgraph Real-time Monitoring I[Elixir Phoenix App] -- Consumes --> F; I -- WebSocket Push --> J(Monitoring Dashboard); end E -- Health & Query Metrics --> F;
数据流转过程:
- 每日的原始日志存储在HDFS上。
- 一个定时调度的Hadoop MapReduce作业启动,对原始日志进行清洗、转换,并计算出用户与实体间的关系,最终将结构化的特征数据以Parquet格式输出到HDFS的另一个目录。
- 一个独立的Spring Batch应用负责数据导入。它会读取HDFS上的Parquet文件,将其转换为Dgraph的RDF N-Quad格式,然后通过Dgraph的Bulk Loader或gRPC接口高效地导入数据。
- 在导入过程中或完成后,该应用会向RabbitMQ发送关键指标,如导入记录数、耗时、数据分布摘要等。
- 上游应用通过向Spring Boot API服务发送GraphQL查询来获取特征。
- Spring Boot服务将GraphQL查询转换为Dgraph的原生查询,请求Dgraph集群并返回结果。
- Phoenix应用中有一个消费者进程,持续监听RabbitMQ。一旦收到新的监控指标,它会通过Phoenix Channels将数据实时推送到前端监控Dashboard,无需前端刷新或轮询。
核心实现细节与代码
1. Hadoop MapReduce 特征提取
这里的核心是将非结构化的日志转化为结构化的图关系。假设日志格式为 timestamp, userId, action, targetId
。
// Simplified Mapper to extract user-target relationships
public class FeatureExtractorMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] parts = line.split(",");
if (parts.length == 4) {
String userId = parts[1];
String action = parts[2];
String targetId = parts[3];
// We want to model the interaction as an edge property
// Key: <user_id> <target_id>, Value: 1 (for counting interactions)
if ("interact".equals(action)) {
word.set(userId + "\t" + targetId);
context.write(word, one);
}
}
}
}
// Simplified Reducer to aggregate interactions
public class InteractionCountReducer extends Reducer<Text, IntWritable, Text, NullWritable> {
private Text result = new Text();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
// Output format ready for Parquet conversion: user_id, target_id, interaction_count
String[] ids = key.toString().split("\t");
String output = String.format("{\"user_id\": \"%s\", \"target_id\": \"%s\", \"interaction_count\": %d}", ids[0], ids[1], sum);
result.set(output);
context.write(result, NullWritable.get());
}
}
- 设计考量: 在真实项目中,我们会使用Avro或Protobuf作为中间格式,并用Spark替代MapReduce以获得更好的性能和更易用的API。但这里的核心思想是相同的:将日志数据转换为描述节点和边的结构化数据。输出使用JSON Lines格式只是为了演示,生产中会直接写入Parquet。
2. Dgraph Schema 与数据导入
Dgraph的schema定义了图谱的结构。我们使用GraphQL+- (Dgraph的查询语言) 的type
来定义。
schema.graphql
type User {
user_id: String! @id
risk_score: Float
interacts_with: [User] @hasInverse(field: interacts_with)
purchased: [Product]
}
type Product {
product_id: String! @id
name: String
category: String
}
- 注意:
@id
指令告诉Dgraphuser_id
和product_id
是外部唯一标识符,可以用来进行upsert操作。@hasInverse
创建了一个双向边,这对于很多图查询至关重要。
数据导入 (Spring Batch ItemWriter)
// In a Spring Batch Job Configuration
@Bean
public ItemWriter<FeatureRecord> dgraphItemWriter(DgraphClient dgraphClient) {
return items -> {
// A common mistake is to send mutations one by one. This is extremely slow.
// Always batch mutations.
try (Transaction txn = dgraphClient.newTransaction()) {
Mutation.Builder muBuilder = Mutation.newBuilder();
List<NQuad> nQuads = new ArrayList<>();
for (FeatureRecord item : items) {
// Example: <_:user_uid> <dgraph.type> "User" .
nQuads.add(NQuad.newBuilder()
.setSubject(String.format("_:%s", item.getUserId()))
.setPredicate("dgraph.type")
.setObjectValue(StringValue.newBuilder().setVal("User").build())
.build());
// <_:user_uid> <user_id> "user_id_value" .
nQuads.add(NQuad.newBuilder()
.setSubject(String.format("_:%s", item.getUserId()))
.setPredicate("user_id")
.setObjectValue(StringValue.newBuilder().setVal(item.getUserId()).build())
.build());
// Create the relationship
// <_:user_uid> <interacts_with> <_:target_uid> .
nQuads.add(NQuad.newBuilder()
.setSubject(String.format("_:%s", item.getUserId()))
.setPredicate("interacts_with")
.setObjectId(String.format("_:%s", item.getTargetId()))
.build());
// Add edge properties. This is a bit tricky in Dgraph.
// We model it using facets.
Facet facet = Facet.newBuilder()
.setKey("interaction_count")
.setValue(String.valueOf(item.getInteractionCount()).getBytes())
.setValType(Facet.ValType.INT)
.build();
// The NQuad for the relationship needs to carry the facet.
nQuads.add(NQuad.newBuilder()
.setSubject(String.format("_:%s", item.getUserId()))
.setPredicate("interacts_with")
.setObjectId(String.format("_:%s", item.getTargetId()))
.addFacets(facet) // Attach facet here
.build());
}
muBuilder.addAllSet(nQuads);
txn.mutate(muBuilder.build());
txn.commit();
} catch (Exception e) {
// In production, add robust error handling: retry logic, dead-letter queue, etc.
log.error("Failed to write batch to Dgraph", e);
throw new RuntimeException(e);
}
};
}
- 这里的坑在于: Dgraph的性能很大程度上取决于你如何批量提交数据。逐条写入会导致大量的网络开销和事务开销。正确的做法是攒够一个大批次(比如1000条记录),在一个事务中通过
Mutation
一次性提交。另外,对于边上的属性(如交互次数),需要使用Facets
来实现,这是一个常见的Dgraph建模技巧。
3. Spring Boot + GraphQL 服务层
我们使用 spring-boot-starter-graphql
来快速构建GraphQL端点。
FeatureController.java
@Controller
public class FeatureController {
private final DgraphService dgraphService;
// Standard constructor injection
public FeatureController(DgraphService dgraphService) {
this.dgraphService = dgraphService;
}
// This method maps to a GraphQL query named "complexFeatureQuery"
@QueryMapping
public FeatureResult complexFeatureQuery(@Argument String targetProductId, @Argument int minInteractions) {
return dgraphService.findUsersWithComplexInteraction(targetProductId, minInteractions);
}
}
DgraphService.java
@Service
@Slf4j
public class DgraphService {
private final DgraphAsyncClient dgraphClient;
public DgraphService(DgraphAsyncClient dgraphClient) {
this.dgraphClient = dgraphClient;
}
// Unit test idea: Mock DgraphAsyncClient and verify the generated query string is correct.
// Also test parsing logic for both success and error responses.
public FeatureResult findUsersWithComplexInteraction(String targetProductId, int minInteractions) {
// This query finds users who bought a product, then finds other users
// who interacted with them, and filters those interactions.
String query = String.format("""
query GetComplexFeatures($productId: string, $minInteractions: int) {
# Step 1: Find users who bought the target product
var(func: eq(product_id, $productId)) {
~purchased {
buyers as uid
}
}
# Step 2: From the buyers, find users they interacted with
var(func: uid(buyers)) {
interacts_with @filter(gt(interaction_count, $minInteractions)) {
interacting_users as uid
}
}
# Step 3: Fetch details of the final user set
final_users(func: uid(interacting_users)) {
user_id
risk_score
}
}
""", targetProductId, minInteractions);
Map<String, String> vars = new HashMap<>();
vars.put("$productId", targetProductId);
vars.put("$minInteractions", String.valueOf(minInteractions));
try {
Response response = dgraphClient.newReadOnlyTransaction().queryWithVars(query, vars).get();
// In a real application, you'd use a proper JSON parsing library (e.g., Jackson)
// to map this to your domain objects.
JsonObject json = JsonParser.parseString(new String(response.getJson().toByteArray())).getAsJsonObject();
// Add proper error handling for Dgraph errors, timeouts etc.
// ... parsing logic to build FeatureResult object ...
return new FeatureResult(/*...parsed data...*/);
} catch (InterruptedException | ExecutionException e) {
log.error("Dgraph query failed for product {}", targetProductId, e);
Thread.currentThread().interrupt(); // Preserve interrupted status
throw new DgraphQueryException("Failed to query Dgraph", e);
}
}
}
- 设计考量: 使用Dgraph的
var
块是构建复杂查询的关键。它允许我们将一个查询的结果集作为下一个查询块的输入,形成一个管道。这比在客户端进行多次查询然后合并结果要高效得多。日志和异常处理至关重要,必须明确区分客户端错误和服务器端错误。
4. Phoenix 实时监控
Phoenix的核心是Channel,它提供了基于WebSocket的双向通信。
assets/js/socket.js
(前端部分)
import {Socket} from "phoenix"
let socket = new Socket("/socket", {params: {token: window.userToken}})
socket.connect()
let channel = socket.channel("monitoring:lobby", {})
channel.on("new_metrics", payload => {
// payload will contain {batch_id: "...", records_ingested: 10000, duration_ms: 5000}
console.log("Received new ingestion metrics:", payload)
// Here, you would update your charts (e.g., using D3.js, Chart.js)
updateDashboard(payload);
})
channel.join()
.receive("ok", resp => { console.log("Joined monitoring channel successfully", resp) })
.receive("error", resp => { console.log("Unable to join", resp) })
export default socket
lib/my_app_web/channels/monitoring_channel.ex
(后端部分)
defmodule MyAppWeb.MonitoringChannel do
use MyAppWeb, :channel
# Called when a client joins the channel with a topic
def join("monitoring:lobby", _payload, socket) do
# Here you could add authentication/authorization logic
{:ok, socket}
end
# This function is not called by clients directly.
# It's an internal API for our application to broadcast messages.
def handle_info({:new_metrics, metrics}, socket) do
# Push the message to all connected clients on this topic
push(socket, "new_metrics", metrics)
{:noreply, socket}
end
# We can also handle messages from clients, but it's not needed for this use case.
def handle_in("client_event", payload, socket) do
# ...
{:noreply, socket}
end
end
RabbitMQ 消费者 lib/my_app/metrics_consumer.ex
defmodule MyApp.MetricsConsumer do
use GenServer
alias MyAppWeb.Endpoint
def start_link(_opts) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
# Setup connection to RabbitMQ
{:ok, conn} = AMQP.Connection.open("amqp://guest:guest@localhost")
{:ok, chan} = AMQP.Channel.open(conn)
# Declare queue and consumer
AMQP.Queue.declare(chan, "dgraph_ingest_metrics")
AMQP.Basic.consume(chan, "dgraph_ingest_metrics", nil, no_ack: true)
{:ok, %{channel: chan}}
end
# This callback handles incoming messages from RabbitMQ
def handle_info({:basic_deliver, payload, _meta}, state) do
# Decode the JSON payload
metrics = Jason.decode!(payload)
# Broadcast to the Phoenix channel.
# The topic "monitoring:lobby" must match the one clients subscribe to.
Endpoint.broadcast("monitoring:lobby", "new_metrics", metrics)
{:noreply, state}
end
end
- 这里的关键: 我们没有将Phoenix直接暴露给数据导入服务。这是一个重要的架构解耦。Spring Batch服务只负责将消息推送到稳定的消息队列(RabbitMQ)。Phoenix应用作为消费者,从队列中获取消息,再通过Channel广播出去。这使得两个系统可以独立扩展和失败。如果Phoenix应用宕机,消息会暂存在RabbitMQ中,不会丢失。
架构的局限性与未来展望
这套异构架构虽然在功能上满足了我们极为苛刻的需求,但也引入了显而易见的挑战。首先是运维成本。维护四个完全不同的技术栈,需要一个技能全面的DevOps团队,并为每个组件建立独立的、深入的可观测性体系。其次,跨系统的数据一致性和schema演进是一个长期痛点,任何上游数据格式的变更都可能需要协调Hadoop、Spring和Dgraph三方的代码修改。
未来的一个优化方向可能是统一计算引擎。例如,用Flink替代Hadoop批处理和部分Spring Batch的逻辑,实现流批一体,从而减少一个重量级组件。另一个方向是探索Dgraph的订阅(Subscription)功能,如果其足够成熟,或许可以在某些场景下替代Phoenix+RabbitMQ的组合,以简化实时推送链路。但就目前而言,这套经过仔细权衡的组合拳,是解决特定领域复杂问题的有效实践。