Aeron Subscription.controlledPoll
深度解析受控轮询机制 - 递归剖析所有相关方法与类
概述
controlledPoll 是 Aeron 中用于受控消息轮询的核心方法。
与普通的 poll 方法不同,它允许消息处理器通过返回 Action 枚举值来精确控制消息消费的行为。
- 流量控制 - 消费者可以控制何时提交位置
- 重试机制 - 支持 ABORT 操作重新处理消息
- 灵活中断 - 支持 BREAK 操作提前结束轮询
- 批量处理 - 支持多消息批量处理后统一提交
主要应用场景
整体架构
类关系图
方法调用层次
Subscription.controlledPoll
Subscription.controlledPoll 是消息轮询的入口点,负责协调多个 Image(发布者连接)之间的消息读取。
方法定义
源码位置:io.aeron.Subscription 第 221-244 行
/** * 以受控方式轮询订阅下的所有 Image,获取可用的消息片段。 * 控制应用于流中的消息片段。即使某个流返回 BREAK 或 ABORT, * 其他流上如果有更多可读片段,仍会继续读取。 * * @param fragmentHandler 处理每个消息片段的回调函数 * @param fragmentLimit 跨多个 Image 轮询时的最大消息片段数 * @return 接收到的消息片段数量 */ public int controlledPoll( final ControlledFragmentHandler fragmentHandler, final int fragmentLimit) { // 获取当前订阅关联的所有 Image(每个 Image 代表一个发布者连接) final Image[] images = this.images; final int length = images.length; int fragmentsRead = 0; // 使用 Round-Robin 策略确定起始索引 // 每次调用后 roundRobinIndex 递增,确保公平轮询 int startingIndex = roundRobinIndex++; if (startingIndex >= length) { // 索引越界时重置为 0 roundRobinIndex = startingIndex = 0; } // 第一轮遍历:从 startingIndex 到数组末尾 for (int i = startingIndex; i < length && fragmentsRead < fragmentLimit; i++) { // 委托给 Image 进行实际的消息读取 // 剩余配额 = fragmentLimit - 已读取数量 fragmentsRead += images[i].controlledPoll(fragmentHandler, fragmentLimit - fragmentsRead); } // 第二轮遍历:从数组开头到 startingIndex(实现环形遍历) for (int i = 0; i < startingIndex && fragmentsRead < fragmentLimit; i++) { fragmentsRead += images[i].controlledPoll(fragmentHandler, fragmentLimit - fragmentsRead); } // 返回本次轮询读取的总片段数 return fragmentsRead; }
参数说明
| 参数名 | 类型 | 说明 |
|---|---|---|
fragmentHandler |
ControlledFragmentHandler |
消息片段处理器,每读取到一个消息片段就会回调其 onFragment 方法 |
fragmentLimit |
int |
本次轮询的最大消息片段数,用于限制单次轮询的工作量 |
| 返回值 | int |
实际读取到的消息片段数量 |
执行流程
通过 roundRobinIndex 变量,每次轮询从不同的 Image 开始,确保多个发布者的消息能够公平地被消费,避免某个 Image 的消息长期得不到处理的"饥饿"问题。
Image.controlledPoll
Image.controlledPoll 是消息读取的核心实现,它直接操作底层的 Term Buffer 来读取消息帧。
核心逻辑
源码位置:io.aeron.Image 第 406-484 行
public int controlledPoll( final ControlledFragmentHandler handler, final int fragmentLimit) { // 如果 Image 已关闭,直接返回 0 if (isClosed) { return 0; } // ==================== 初始化阶段 ==================== int fragmentsRead = 0; // 获取当前订阅者位置(绝对字节偏移量) long initialPosition = subscriberPosition.get(); // 计算在当前 Term 内的偏移量 // termLengthMask = termLength - 1, 用于快速取模运算 int initialOffset = (int) initialPosition & termLengthMask; int offset = initialOffset; // 获取当前活动的 Term Buffer final UnsafeBuffer termBuffer = activeTermBuffer(initialPosition); final int capacity = termBuffer.capacity(); // 设置 Header 的 buffer 引用 final Header header = this.header; header.buffer(termBuffer); // ==================== 消息读取循环 ==================== try { // 循环条件:未达到限制 AND 未超出容量 AND 未关闭 while (fragmentsRead < fragmentLimit && offset < capacity && !isClosed) { // 使用 volatile 语义读取帧长度 final int length = frameLengthVolatile(termBuffer, offset); // 长度 <= 0 表示没有更多数据 if (length <= 0) { break; } // 保存当前帧的起始偏移量 final int frameOffset = offset; // 计算对齐后的帧长度(32 字节对齐) final int alignedLength = BitUtil.align(length, FRAME_ALIGNMENT); offset += alignedLength; // 跳过填充帧(Term 末尾的对齐填充) if (isPaddingFrame(termBuffer, frameOffset)) { continue; } // 增加已读计数 ++fragmentsRead; // 设置 Header 的偏移量 header.offset(frameOffset); // ==================== 调用处理器 ==================== // 注意:传入的是消息体部分(跳过 32 字节头) final Action action = handler.onFragment( termBuffer, frameOffset + HEADER_LENGTH, // 数据起始位置 length - HEADER_LENGTH, // 数据长度 header ); // ==================== Action 处理 ==================== if (ABORT == action) { // ABORT: 回滚本次读取,不计入统计 --fragmentsRead; offset -= alignedLength; break; } if (BREAK == action) { // BREAK: 保留进度,提前结束循环 break; } if (COMMIT == action) { // COMMIT: 立即更新订阅者位置 initialPosition += (offset - initialOffset); initialOffset = offset; if (!isClosed) { subscriberPosition.setRelease(initialPosition); } } // CONTINUE: 继续循环,最后统一更新位置 } } catch (final Exception ex) { // 异常处理:通知错误处理器 errorHandler.onError(ex); } finally { // ==================== 最终位置更新 ==================== final long resultingPosition = initialPosition + (offset - initialOffset); if (resultingPosition > initialPosition && !isClosed) { subscriberPosition.setRelease(resultingPosition); } } return fragmentsRead; }
Action 处理机制
Action 对比表
| Action | fragmentsRead | offset | subscriberPosition | 继续循环 |
|---|---|---|---|---|
| ABORT | -1(不计入) | 回滚 | 不更新 | 否 |
| BREAK | 保持 | 保持 | finally 中更新 | 否 |
| COMMIT | 保持 | 保持 | 立即更新 | 是 |
| CONTINUE | 保持 | 保持 | finally 中更新 | 是 |
ControlledFragmentHandler
ControlledFragmentHandler 是一个函数式接口,定义了受控消息处理的回调规范。
@FunctionalInterface public interface ControlledFragmentHandler { /** * 处理器返回后要采取的动作 */ enum Action { // 中止当前轮询,不提交当前消息的位置 ABORT, // 中断当前轮询,提交到当前消息为止的位置 BREAK, // 立即提交位置,然后继续处理后续消息 COMMIT, // 继续处理,最后统一提交位置 CONTINUE } /** * 处理从日志中读取的数据片段的回调方法 * * 注意:在此回调中禁止重入调用 Aeron 客户端,否则会导致未定义行为 * * @param buffer 包含数据的缓冲区 * @param offset 数据开始的偏移量 * @param length 数据的字节长度 * @param header 包含数据元信息的头 * @return 处理完成后关于流位置应采取的动作 */ Action onFragment( DirectBuffer buffer, int offset, int length, Header header ); }
Action 枚举详解
ABORT
行为:中止当前轮询操作,不提交当前消息片段的位置。
使用场景:
- 处理过程中发生错误,需要重试
- 下游缓冲区已满,需要等待
- 事务处理失败,需要回滚
BREAK
行为:中断当前轮询操作,提交包括当前消息在内的所有已处理消息的位置。
使用场景:
- 已处理到某个边界点,需要暂停
- 检测到需要特殊处理的消息
- 时间片用完,需要让出 CPU
COMMIT
行为:立即提交当前位置,然后继续处理后续消息。
使用场景:
- 重要消息处理完毕,需要立即确认
- 事务边界,需要持久化进度
- 流量控制点,需要及时释放窗口
CONTINUE
行为:继续处理后续消息,在轮询结束后统一提交位置。
使用场景:
- 正常的消息处理流程
- 批量处理场景
- 对实时性要求不高的场景
ControlledFragmentAssembler
ControlledFragmentAssembler 是一个装饰器类,用于自动重组分片消息。当消息大于 MTU 时,Aeron 会将其分片发送,此类负责在接收端将这些分片重新组装成完整消息。
消息组装逻辑
分片消息标志位
public Action onFragment( final DirectBuffer buffer, final int offset, final int length, final Header header) { // 获取帧的标志位 final byte flags = header.flags(); Action action = Action.CONTINUE; // ==================== 情况1: 非分片消息 ==================== // 当 BEGIN 和 END 标志都设置时,表示这是完整的消息 if ((flags & UNFRAGMENTED) == UNFRAGMENTED) { // 直接委托给业务处理器,不需要组装 action = delegate.onFragment(buffer, offset, length, header); } // ==================== 情况2: 分片消息的第一个片段 ==================== else if ((flags & BEGIN_FRAG_FLAG) == BEGIN_FRAG_FLAG) { // 获取或创建该 session 的 BufferBuilder final BufferBuilder builder = getBufferBuilder(header.sessionId()); // 重置并开始收集新的分片消息 builder.reset() // 清空之前的数据 .captureHeader(header) // 保存第一片的 Header .append(buffer, offset, length) // 追加第一片数据 .nextTermOffset(header.nextTermOffset()); // 记录下一片的预期位置 } // ==================== 情况3: 中间片段或最后片段 ==================== else { // 尝试获取已存在的 BufferBuilder final BufferBuilder builder = builderBySessionIdMap.get(header.sessionId()); if (null != builder) { // 验证片段的连续性(防止乱序或丢失) if (header.termOffset() == builder.nextTermOffset()) { // 保存追加前的位置(用于 ABORT 回滚) final int limit = builder.limit(); // 追加当前片段 builder.append(buffer, offset, length); // 检查是否为最后一个片段 if ((flags & END_FRAG_FLAG) == END_FRAG_FLAG) { // ===== 消息组装完成,调用业务处理器 ===== action = delegate.onFragment( builder.buffer(), // 完整消息的缓冲区 0, // 从头开始 builder.limit(), // 完整消息的长度 builder.completeHeader(header) // 组合后的 Header ); // 根据处理结果决定是否回滚 if (Action.ABORT == action) { // ABORT: 回滚到追加前的状态,下次重新组装 builder.limit(limit); } else { // 其他 Action: 清空缓冲区,准备下一条消息 builder.reset(); } } else { // 还不是最后片段,记录下一片的预期位置 builder.nextTermOffset(header.nextTermOffset()); } } else { // 片段不连续(乱序或丢失),丢弃已收集的数据 builder.reset(); } } } return action; }
辅助类详解
Header 类 - 帧头元数据
源码位置:io.aeron.logbuffer.Header
Header 主要方法
| 方法 | 返回类型 | 说明 |
|---|---|---|
frameLength() | int | 帧的总长度(包括头部 32 字节) |
sessionId() | int | 会话 ID,标识发布者 |
streamId() | int | 流 ID |
termId() | int | Term ID,用于计算位置 |
termOffset() | int | 在 Term 内的偏移量 |
flags() | byte | 标志位(BEGIN/END) |
position() | long | 计算消息的绝对位置 |
reservedValue() | long | 保留值(可用于自定义元数据) |
LogBufferDescriptor - 日志缓冲区结构
源码位置:io.aeron.logbuffer.LogBufferDescriptor
/** * 根据位置计算 Term 索引(0, 1, 或 2) * 使用位运算实现高效的取模操作 */ public static int indexByPosition( final long position, final int positionBitsToShift) { // position >>> positionBitsToShift 相当于 position / termLength // % PARTITION_COUNT 取模得到索引 (0, 1, 2) return (int) ((position >>> positionBitsToShift) % PARTITION_COUNT); } /** * 计算绝对位置 * position = (termId - initialTermId) * termLength + termOffset */ public static long computePosition( final int activeTermId, final int termOffset, final int positionBitsToShift, final int initialTermId) { // 计算已经过的 Term 数量(支持 termId 溢出后的回绕) final long termCount = activeTermId - initialTermId; // termCount << positionBitsToShift 相当于 termCount * termLength return (termCount << positionBitsToShift) + termOffset; }
该值由 termLength 决定,等于 log2(termLength):
- termLength = 64KB → positionBitsToShift = 16
- termLength = 1MB → positionBitsToShift = 20
- termLength = 16MB → positionBitsToShift = 24
完整调用链
调用栈总结
Subscription.controlledPoll(handler, limit) │ ├── Image.controlledPoll(handler, limit) │ │ │ ├── subscriberPosition.get() │ │ │ ├── activeTermBuffer(position) │ │ └── LogBufferDescriptor.indexByPosition() │ │ │ └── [LOOP] 读取消息帧 │ │ │ ├── FrameDescriptor.frameLengthVolatile() │ ├── BitUtil.align() │ ├── FrameDescriptor.isPaddingFrame() │ ├── Header.offset() │ │ │ ├── handler.onFragment() │ │ │ │ │ ├── [UNFRAGMENTED] delegate.onFragment() │ │ │ │ │ └── [FRAGMENTED] │ │ ├── BufferBuilder.reset() │ │ ├── BufferBuilder.append() │ │ └── [END] delegate.onFragment() │ │ │ └── [处理 Action] │ ├── ABORT: 回滚 offset │ ├── BREAK: 退出循环 │ ├── COMMIT: subscriberPosition.setRelease() │ └── CONTINUE: 继续 │ └── [FINALLY] subscriberPosition.setRelease()
使用示例
// 创建订阅 Subscription subscription = aeron.addSubscription( "aeron:udp?endpoint=localhost:40123", streamId ); // 定义消息处理器 ControlledFragmentHandler handler = (buffer, offset, length, header) -> { try { // 处理消息 processMessage(buffer, offset, length); return Action.CONTINUE; } catch (Exception e) { // 处理失败,不提交位置,下次重试 return Action.ABORT; } }; // 轮询消息 while (running) { int fragmentsRead = subscription.controlledPoll(handler, 10); if (fragmentsRead == 0) { idleStrategy.idle(); } }
// 业务处理器(只接收完整消息) ControlledFragmentHandler businessHandler = (buffer, offset, length, header) -> { System.out.println("收到完整消息,长度: " + length); return Action.CONTINUE; }; // 使用 Assembler 包装,自动处理分片重组 ControlledFragmentAssembler assembler = new ControlledFragmentAssembler( businessHandler, 1024 * 1024, // 初始缓冲区大小: 1MB false // 不使用 DirectByteBuffer ); // 轮询时使用 assembler subscription.controlledPoll(assembler, 10);
final List<Message> batch = new ArrayList<>(); final int BATCH_SIZE = 100; ControlledFragmentHandler transactionHandler = (buffer, offset, length, header) -> { Message msg = parseMessage(buffer, offset, length); batch.add(msg); // 达到批次大小时处理 if (batch.size() >= BATCH_SIZE) { try { // 批量处理(例如写入数据库) processBatch(batch); batch.clear(); // 批量处理成功,立即提交位置 return Action.COMMIT; } catch (Exception e) { batch.clear(); // 处理失败,中止并重试整个批次 return Action.ABORT; } } // 继续收集批次 return Action.CONTINUE; };
final BlockingQueue<Message> outputQueue = new LinkedBlockingQueue<>(1000); ControlledFragmentHandler backpressureHandler = (buffer, offset, length, header) -> { Message msg = parseMessage(buffer, offset, length); // 检查下游队列容量 if (outputQueue.remainingCapacity() == 0) { // 队列已满,暂停消费 // ABORT 不会提交当前消息,下次轮询会重新读取 return Action.ABORT; } // 队列有空间,放入消息 outputQueue.offer(msg); return Action.CONTINUE; };
最佳实践
推荐做法
- 优先使用 CONTINUE - 获得最佳性能
- 仅在事务边界使用 COMMIT - 减少 I/O 开销
- 异常时返回 ABORT - 确保消息不丢失
- 合理设置 fragmentLimit - 平衡延迟与吞吐
- 复用 Assembler 实例 - 避免重复创建
- 及时清理 BufferBuilder - 释放内存
避免做法
- 避免在回调中阻塞 - 影响整体吞吐
- 避免频繁 ABORT - 导致消息重复读取
- 避免过大的 fragmentLimit - 延迟增加
- 避免重入 Aeron 客户端 - 未定义行为
- 避免忽略异常 - 正确处理所有错误
- 避免持有 buffer 引用 - 数据会被覆盖
性能调优建议
| 场景 | 建议 fragmentLimit | 推荐 Action 策略 |
|---|---|---|
| 低延迟交易系统 | 1 - 5 | 每条消息 COMMIT |
| 高吞吐量数据管道 | 100 - 256 | 批量 CONTINUE,最后 COMMIT |
| 日志收集 | 50 - 100 | CONTINUE(依赖 finally 提交) |
| 事务处理 | 10 - 50 | 事务边界 COMMIT,失败 ABORT |
总结
层层委托,职责分明
精确控制消息消费行为
透明处理大消息分片
基于 Aeron 版本:1.48.x
分析的核心文件:
io.aeron.Subscription |
io.aeron.Image |
io.aeron.ControlledFragmentAssembler |
io.aeron.logbuffer.ControlledFragmentHandler |
io.aeron.logbuffer.Header |
io.aeron.logbuffer.FrameDescriptor |
io.aeron.logbuffer.LogBufferDescriptor