要将 Apache Flink 作业中经过复杂计算得出的状态,实时、可靠且类型安全地同步到一个远在浏览器中的 React 前端,这是一个典型的现代数据应用难题。问题核心在于,后端是基于 JVM 的、面向状态与流的世界,而前端是基于 TypeScript 的、面向事件与组件渲染的世界。两者之间存在巨大的技术鸿沟。
常规的解决方案,如客户端轮询 REST API,在延迟、服务器负载和实时性上都存在根本缺陷。它无法捕捉到 Flink 状态在毫秒间的瞬时变化,也无法应对高频更新场景。这套方案从架构上就输了。
另一种方案是采用 WebSocket 进行服务端推送。这在方向上是正确的,但魔鬼藏在细节里。如何管理成千上万个 WebSocket 的持久连接?如何保证 Flink 作业的输出能精准地路由到正确的客户端?当连接中断后如何恢复状态?最重要的是,如何构建一个抽象层,让前端开发者不必关心底层的连接管理、消息反序列化和重试逻辑,而是像操作本地状态一样,消费来自 Flink 的实时数据流?
本文记录的便是架构决策与核心库的实现过程,目标是构建一套完整的解决方案,连接 AWS 上的 Flink 应用与基于 Valtio 的 React 前端。我们将对比两种截然不同的架构模式,并最终选择事件驱动的推送模型。接着,我们会深入实现一个前端核心库,它将负责处理所有复杂性,为上层应用提供一个干净、类型安全的接口。
方案 A:基于请求-响应的拉取模型(REST API + Polling)
这是一种传统且易于理解的模式。
实现路径: Flink 作业将计算结果(例如,每分钟的用户活跃度)写入一个低延迟的数据库,如 AWS DynamoDB 或 ElastiCache (Redis)。前端应用则通过一个 API Gateway 和 Lambda 函数暴露的 RESTful 接口,以固定的时间间隔(如每5秒)轮询最新的状态。
优势:
- 简单性: 整个技术栈非常成熟,前后端职责清晰。
- 无状态: HTTP 服务本身是无状态的,易于水平扩展。
- 容错性: 客户端的轮询逻辑可以很简单地处理请求失败(例如,通过重试)。
劣势:
- 高延迟: 数据的可见性延迟最低也是轮询间隔,无法做到真正的实时。对于需要秒级甚至亚秒级响应的场景,此方案完全不适用。
- 资源浪费: 无论后端状态是否变化,客户端都会持续发送请求,造成大量的无效网络流量和服务器计算资源浪费。
- 状态不一致: 在两次轮询之间,状态可能已经多次变化,前端只能获取到最终结果,丢失了中间过程,这对于某些需要观察状态演变过程的应用是致命的。
- 扩展性问题: 当客户端数量剧增时,对数据库和 API 层的轮询压力会成为显著的瓶颈。
在真实项目中,这种模式仅适用于对实时性要求不高的仪表盘或报表系统。对于我们的目标——实时状态同步,它从根本上就是错误的。
方案 B:基于事件驱动的推送模型(WebSocket + Event Sourcing)
该模型将整个系统视为一个事件流。Flink 处理输入事件流,生成状态更新事件流,并通过一个持久化连接管道将这些事件推送给客户端。
实现路径: Flink 作业将聚合后的状态变更事件写入一个消息队列,例如 AWS Kinesis Data Streams。一个 Lambda 函数消费此流,并通过 AWS API Gateway WebSocket API 将事件推送给已连接的、订阅了特定主题的客户端。前端核心库负责管理 WebSocket 连接,并在接收到事件后,更新 Valtio 状态。
优势:
- 极低延迟: 数据一旦由 Flink 处理完成,几乎可以立即被推送到前端,延迟通常在毫秒级别。
- 高效: 只有在状态真正发生变化时,数据才会被推送,没有无效的网络请求。
- 服务端驱动: 通信由服务端发起,完美匹配 Flink 的流式计算模型。
劣势:
- 架构复杂度: 引入了 WebSocket API Gateway、Kinesis Streams 和 Lambda,增加了系统的维护单元。
- 连接管理: 服务端需要维护客户端的连接状态。API Gateway 为此提供了开箱即用的支持,但开发者需要自行管理连接ID与业务实体(如用户ID、订阅主题)的映射关系,通常使用 DynamoDB。
- 客户端鲁棒性: 客户端必须处理连接的生命周期,包括断线重连、心跳维持、消息丢失的补偿机制等。这正是我们需要构建核心库来封装的复杂性。
最终决策: 我们选择方案 B。尽管其复杂度更高,但只有它才能满足端到端实时性的核心需求。技术选型的本质是权衡,为了获得极致的实时体验,我们愿意承担更高的架构复杂性,并通过构建一个健壮的核心库来管理这种复杂性。
核心实现概览
我们的系统架构如下,它体现了事件驱动设计的精髓。
graph TD A[数据源/事件源] -->|写入| B(AWS Kinesis Data Streams); B --> C{Apache Flink on KDA}; C -->|处理/聚合| D[Output Kinesis Stream]; D -->|触发| E(AWS Lambda Function); subgraph Frontend G[React App] -->|使用核心库| H(Valtio State); end subgraph AWS Cloud E -->|通过ConnectionId推送| F(API Gateway WebSocket API); end F -->|WebSocket Message| G; subgraph Flink Job State Management C -- State Backend --> S3/RockDB; end subgraph Connection Management I[DynamoDB Table] F -- OnConnect/OnDisconnect --> J(Connection Lambda) J -- 读/写 --> I E -- 读取ConnectionId --> I end
1. 后端:Apache Flink 状态计算作业
这里的关键是 Flink 作业本身。它不能是一个简单的无状态转换,必须是一个有状态的作业,这样才能体现出状态同步的价值。我们以一个用户行为分析场景为例:计算每个用户在30秒滚动窗口内的点击次数。
这是一个生产级的 Flink Java 作业示例,它从 Kinesis 读取用户点击事件,进行有状态的窗口计算,然后将结果写回另一个 Kinesis 流。
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import java.util.Map;
import java.util.Properties;
// 数据模型:输入事件
class ClickEvent {
public String userId;
public long timestamp;
public String eventType;
// 为了简化,省略构造函数、getter/setter和toString
// 在真实项目中,强烈建议使用 Avro 或 Protobuf 进行序列化
}
// 数据模型:输出结果
class UserWindowedStats {
public String userId;
public long windowEnd;
public long clickCount;
// 省略...
}
public class RealtimeUserStatsJob {
private static final String INPUT_STREAM_NAME = "flink-input-stream";
private static final String OUTPUT_STREAM_NAME = "flink-output-stream-for-ui";
private static final String AWS_REGION = "us-east-1";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从 Kinesis Analytics 运行时获取配置,如果在本地运行则使用默认值
Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
Properties consumerProps = applicationProperties.getOrDefault("ConsumerConfigProperties", new Properties());
Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, AWS_REGION);
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
kinesisConsumerConfig.putAll(consumerProps);
DataStream<String> kinesisStream = env.addSource(
new FlinkKinesisConsumer<>(INPUT_STREAM_NAME, new SimpleStringSchema(), kinesisConsumerConfig));
DataStream<UserWindowedStats> statsStream = kinesisStream
// 步骤1: 反序列化和时间戳分配
// 这里的错误处理至关重要,生产代码应包含一个侧输出流来捕获脏数据
.map((MapFunction<String, ClickEvent>) value -> {
try {
// 假设输入是简单的JSON字符串,例如: {"userId":"u123", "timestamp":167...}
// 在生产中,使用 Jackson 或 Gson
return deserialize(value);
} catch (Exception e) {
// LOG.warn("Failed to parse event: {}", value, e);
return null; // 返回null以便后续过滤
}
})
.filter(java.util.Objects::nonNull)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<ClickEvent>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.timestamp)
)
// 步骤2: 按 userId 分区
.keyBy(event -> event.userId)
// 步骤3: 定义一个30秒的滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
// 步骤4: 在窗口上进行聚合计算
// .reduce() 或 .aggregate() 提供了更高效的增量聚合
.apply((userId, window, events, out) -> {
long count = 0;
for (ClickEvent ignored : events) {
count++;
}
UserWindowedStats stats = new UserWindowedStats();
stats.userId = userId;
stats.windowEnd = window.getEnd();
stats.clickCount = count;
out.collect(stats);
});
// 步骤5: 将聚合结果序列化并发送到输出Kinesis流
Properties kinesisProducerConfig = new Properties();
kinesisProducerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, AWS_REGION);
// 配置生产者,例如聚合和批处理大小,以优化成本和性能
// kinesisProducerConfig.setProperty("AggregationEnabled", "true");
FlinkKinesisProducer<String> kinesisProducer = new FlinkKinesisProducer<>(
(stats) -> serialize(stats), // 序列化为JSON字符串
kinesisProducerConfig
);
kinesisProducer.setDefaultStream(OUTPUT_STREAM_NAME);
kinesisProducer.setDefaultPartition("0"); // 简单起见,可根据userId哈希分区
statsStream
.map(RealtimeUserStatsJob::serialize)
.addSink(kinesisProducer);
env.execute("Flink Real-time User Stats Job");
}
// 辅助的序列化/反序列化方法
private static ClickEvent deserialize(String value) {
// 实现 JSON -> Object 的转换
return new ClickEvent(); // 伪代码
}
private static String serialize(UserWindowedStats stats) {
// 实现 Object -> JSON 的转换
return "{\"userId\":\"" + stats.userId + "\",\"clickCount\":" + stats.clickCount + "}"; // 伪代码
}
}
这个作业的要点在于:它是有状态的。Flink 的 state backend (如 RocksDB) 会持久化每个 userId
在当前窗口内的状态。即使作业失败重启,状态也能恢复,保证了计算的准确性。输出 UserWindowedStats
对象就是我们需要同步到前端的状态。
2. 前端:类型安全的 Valtio-Flink 核心库
现在进入前端的核心部分。我们将构建一个名为 valtio-flink-connector
的微型库,它封装了所有与后端 WebSocket 通信的复杂逻辑。
库的设计目标:
- 类型安全: 与后端的数据模型保持一致的 TypeScript 类型定义。
- 声明式 API: 提供一个简单的 React Hook (
useFlinkSubscription
),传入订阅主题和初始状态即可。 - 自动连接管理: 自动处理 WebSocket 的连接、断开、心跳和带指数退避的重连。
- 状态融合: 将从 WebSocket 收到的数据无缝地更新到 Valtio 的 proxy state 中。
下面是这个核心库的 TypeScript 实现:
import { proxy, useSnapshot } from 'valtio';
import { useRef, useEffect, useState } from 'react';
// ==========================================================
// 1. 类型定义 - 与后端 Flink 输出模型严格对应
// ==========================================================
interface UserWindowedStats {
userId: string;
windowEnd: number;
clickCount: number;
}
// 泛型消息结构,后端推送的消息必须遵循此格式
interface FlinkMessage<T> {
type: 'DATA_UPDATE' | 'SYSTEM_INFO';
payload: T;
}
// 连接状态
type ConnectionStatus = 'CONNECTING' | 'CONNECTED' | 'DISCONNECTED' | 'RECONNECTING';
// ==========================================================
// 2. WebSocket 连接管理器 (非 React 组件,纯粹的逻辑类)
// ==========================================================
class WebSocketManager {
private ws: WebSocket | null = null;
private url: string;
private reconnectAttempts = 0;
private maxReconnectAttempts = 10;
private reconnectInterval = 1000; // 初始1秒
// 各种事件的回调函数
public onOpen: () => void = () => {};
public onClose: () => void = () => {};
public onError: (event: Event) => void = () => {};
public onMessage: (data: any) => void = () => {};
constructor(url: string) {
this.url = url;
}
public connect() {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
// console.log('WebSocket is already connected.');
return;
}
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
// console.log('WebSocket connected.');
this.reconnectAttempts = 0; // 连接成功后重置重试计数器
this.onOpen();
};
this.ws.onmessage = (event) => {
try {
const message = JSON.parse(event.data);
this.onMessage(message);
} catch (error) {
console.error('Failed to parse incoming message:', event.data, error);
}
};
this.ws.onerror = (event) => {
console.error('WebSocket error:', event);
this.onError(event);
};
this.ws.onclose = () => {
// console.log('WebSocket closed.');
this.onClose();
this.reconnect();
};
}
private reconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.min(this.reconnectInterval * Math.pow(2, this.reconnectAttempts), 30000);
// console.log(`Attempting to reconnect in ${delay}ms... (Attempt ${this.reconnectAttempts})`);
setTimeout(() => this.connect(), delay);
} else {
console.error('Max reconnect attempts reached. Giving up.');
}
}
public send(data: object) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
} else {
console.error('Cannot send message, WebSocket is not open.');
}
}
public close() {
if (this.ws) {
this.ws.onclose = null; // 阻止自动重连
this.ws.close();
}
}
}
// ==========================================================
// 3. 核心 React Hook - `useFlinkSubscription`
// ==========================================================
// 使用泛型<T>来定义状态类型
export function useFlinkSubscription<T>(
webSocketUrl: string,
subscriptionKey: string,
initialState: T
) {
// 使用 Valtio proxy 创建响应式状态
// useRef确保proxyState在组件重渲染之间保持不变
const stateRef = useRef(proxy<{ data: T; status: ConnectionStatus }>({
data: initialState,
status: 'DISCONNECTED',
}));
// WebSocket管理器的实例也应在重渲染间保持
const managerRef = useRef<WebSocketManager | null>(null);
useEffect(() => {
if (!webSocketUrl || !subscriptionKey) return;
const manager = new WebSocketManager(webSocketUrl);
managerRef.current = manager;
const updateStatus = (status: ConnectionStatus) => {
stateRef.current.status = status;
};
manager.onOpen = () => {
updateStatus('CONNECTED');
// 连接成功后,发送订阅消息
// 后端Lambda可根据此消息将ConnectionId与userId关联起来
manager.send({ action: 'subscribe', key: subscriptionKey });
};
manager.onClose = () => {
updateStatus('RECONNECTING');
};
manager.onError = () => {
updateStatus('DISCONNECTED');
};
manager.onMessage = (message: FlinkMessage<Partial<T>>) => {
if (message.type === 'DATA_UPDATE' && message.payload) {
// 这里的融合是关键:直接修改valtio的proxy对象
// Valtio会负责高效地触发相关组件的重渲染
Object.assign(stateRef.current.data as object, message.payload);
}
};
updateStatus('CONNECTING');
manager.connect();
// 组件卸载时清理连接
return () => {
manager.close();
managerRef.current = null;
};
}, [webSocketUrl, subscriptionKey]); // 依赖项确保URL或Key变化时重新连接
// 返回一个 useSnapshot 的包装,这样组件就能订阅状态变化
return useSnapshot(stateRef.current);
}
// ==========================================================
// 4. 在React组件中使用该Hook
// ==========================================================
/*
const UserStatsDisplay = ({ userId }) => {
const WEBSOCKET_API_URL = "wss://your-api-id.execute-api.us-east-1.amazonaws.com/prod";
const { data, status } = useFlinkSubscription<UserWindowedStats>(
WEBSOCKET_API_URL,
userId, // 订阅这个用户的统计数据
{ userId, windowEnd: 0, clickCount: 0 } // 初始状态
);
return (
<div>
<h2>User Stats for: {userId}</h2>
<p>Connection Status: {status}</p>
<p>Total Clicks (last 30s): <strong>{data.clickCount}</strong></p>
<p>Last Updated Window: {new Date(data.windowEnd).toLocaleTimeString()}</p>
</div>
);
};
*/
这个核心库通过useFlinkSubscription
这个单一入口点,隐藏了所有的复杂性。UI开发者只需要提供WebSocket地址、一个唯一的订阅键(例如userId
)和初始状态,就能获得一个实时更新的、由Valtio驱动的状态对象。当Flink作业产生新的UserWindowedStats
并推送到前端时,Object.assign
会直接修改proxy state,Valtio的魔法会确保只有消费了clickCount
或windowEnd
的组件才会重新渲染,性能极高。
架构的扩展性与局限性
当前这套架构在实时性、类型安全和前端开发体验上表现出色,但它并非没有局限。
一个主要的考量是消息传递语义。从Flink到Kinesis,再到Lambda,我们可以配置实现端到端的Exactly-Once或At-Least-Once。但是,从API Gateway WebSocket到客户端浏览器的这个环节,本质上是At-Least-Once。网络抖动可能导致消息重复,或者在客户端断线重连期间丢失消息。我们的核心库目前没有处理消息去重或序列号验证的逻辑,对于需要严格顺序和不重复的金融交易等场景,还需要在消息载体中加入序列ID,并在客户端进行状态校验。
另一个问题是冷启动与历史状态。当一个客户端首次连接时,它只能接收到从连接时刻起的新状态。它如何获取之前的历史状态?一个可行的方案是,useFlinkSubscription
在建立WebSocket连接后,可以额外触发一次HTTP请求到另一个REST API,拉取该subscriptionKey
的最新快照状态,用它来初始化Valtio state。这样就结合了拉取(用于初始化)和推送(用于实时更新)两种模式的优点。
最后,随着业务复杂度的增加,API Gateway WebSocket的管理成本也会上升。例如,扇出(fan-out)消息给大量订阅同一个主题的客户端,可能会对后端的Lambda和DynamoDB造成压力。届时可能需要考虑更专业的推送服务,或者在Lambda层引入更智能的批处理与聚合策略。