Aeron Subscription.controlledPoll

深度解析受控轮询机制 - 递归剖析所有相关方法与类

概述

controlledPoll 是 Aeron 中用于受控消息轮询的核心方法。 与普通的 poll 方法不同,它允许消息处理器通过返回 Action 枚举值来精确控制消息消费的行为。

核心特性
  • 流量控制 - 消费者可以控制何时提交位置
  • 重试机制 - 支持 ABORT 操作重新处理消息
  • 灵活中断 - 支持 BREAK 操作提前结束轮询
  • 批量处理 - 支持多消息批量处理后统一提交

主要应用场景

事务性处理
确保消息处理成功后才提交位置,失败时可重试
背压控制
下游处理能力不足时,可通过 ABORT 暂停消费
批量确认
多条消息批量处理后使用 COMMIT 一次性确认
大消息重组
配合 Assembler 重组分片消息

整体架构

类关系图

graph TB subgraph Subscription层 A[Subscription] end subgraph Image层 B[Image] C[Image] D[Image] end subgraph Handler层 E[ControlledFragmentHandler] F[ControlledFragmentAssembler] end subgraph 辅助类 G[Header] H[FrameDescriptor] I[LogBufferDescriptor] end subgraph Action枚举 J[ABORT] K[BREAK] L[COMMIT] M[CONTINUE] end A -->|包含多个| B A -->|包含多个| C A -->|包含多个| D B -->|调用| E F -->|实现| E F -->|委托| E B -->|使用| G B -->|使用| H B -->|使用| I E -->|返回| J E -->|返回| K E -->|返回| L E -->|返回| M style A fill:#e8f0fe,stroke:#667eea,color:#1a1a1a style B fill:#d4edda,stroke:#28a745,color:#1a1a1a style C fill:#d4edda,stroke:#28a745,color:#1a1a1a style D fill:#d4edda,stroke:#28a745,color:#1a1a1a style E fill:#fff3cd,stroke:#ffc107,color:#1a1a1a style F fill:#f8d7da,stroke:#dc3545,color:#1a1a1a

方法调用层次

graph LR subgraph 第一层 A[Subscription.controlledPoll] end subgraph 第二层 B[Image.controlledPoll] end subgraph 第三层 C[frameLengthVolatile] D[isPaddingFrame] E[handler.onFragment] end subgraph 第四层 F[Assembler.onFragment] G[delegate.onFragment] end A --> B B --> C B --> D B --> E E --> F F --> G style A fill:#e8f0fe,stroke:#667eea,color:#1a1a1a style B fill:#d4edda,stroke:#28a745,color:#1a1a1a style E fill:#fff3cd,stroke:#ffc107,color:#1a1a1a style F fill:#f8d7da,stroke:#dc3545,color:#1a1a1a

Subscription.controlledPoll

Subscription.controlledPoll 是消息轮询的入口点,负责协调多个 Image(发布者连接)之间的消息读取。

方法定义

源码位置:io.aeron.Subscription 第 221-244 行

Subscription.java
/**
 * 以受控方式轮询订阅下的所有 Image,获取可用的消息片段。
 * 控制应用于流中的消息片段。即使某个流返回 BREAK 或 ABORT,
 * 其他流上如果有更多可读片段,仍会继续读取。
 *
 * @param fragmentHandler 处理每个消息片段的回调函数
 * @param fragmentLimit   跨多个 Image 轮询时的最大消息片段数
 * @return 接收到的消息片段数量
 */
public int controlledPoll(
        final ControlledFragmentHandler fragmentHandler,
        final int fragmentLimit) {

    // 获取当前订阅关联的所有 Image(每个 Image 代表一个发布者连接)
    final Image[] images = this.images;
    final int length = images.length;
    int fragmentsRead = 0;

    // 使用 Round-Robin 策略确定起始索引
    // 每次调用后 roundRobinIndex 递增,确保公平轮询
    int startingIndex = roundRobinIndex++;
    if (startingIndex >= length) {
        // 索引越界时重置为 0
        roundRobinIndex = startingIndex = 0;
    }

    // 第一轮遍历:从 startingIndex 到数组末尾
    for (int i = startingIndex; i < length && fragmentsRead < fragmentLimit; i++) {
        // 委托给 Image 进行实际的消息读取
        // 剩余配额 = fragmentLimit - 已读取数量
        fragmentsRead += images[i].controlledPoll(fragmentHandler, fragmentLimit - fragmentsRead);
    }

    // 第二轮遍历:从数组开头到 startingIndex(实现环形遍历)
    for (int i = 0; i < startingIndex && fragmentsRead < fragmentLimit; i++) {
        fragmentsRead += images[i].controlledPoll(fragmentHandler, fragmentLimit - fragmentsRead);
    }

    // 返回本次轮询读取的总片段数
    return fragmentsRead;
}

参数说明

参数名 类型 说明
fragmentHandler ControlledFragmentHandler 消息片段处理器,每读取到一个消息片段就会回调其 onFragment 方法
fragmentLimit int 本次轮询的最大消息片段数,用于限制单次轮询的工作量
返回值 int 实际读取到的消息片段数量

执行流程

Subscription.controlledPoll 执行流程图
flowchart TD A([开始]) --> B[/"获取 images 数组"/] B --> C[/"计算 startingIndex = roundRobinIndex++"/] C --> D{"startingIndex >= length ?"} D -->|是| E["重置: roundRobinIndex = 0\nstartingIndex = 0"] D -->|否| F[保持当前索引值] E --> G F --> G subgraph 第一轮遍历 G["i = startingIndex"] G --> H{"i < length AND\nfragmentsRead < limit ?"} H -->|是| I["调用 images[i].controlledPoll()"] I --> J["fragmentsRead += 返回值"] J --> K["i++"] K --> H end H -->|否| L subgraph 第二轮遍历 L["i = 0"] L --> M{"i < startingIndex AND\nfragmentsRead < limit ?"} M -->|是| N["调用 images[i].controlledPoll()"] N --> O["fragmentsRead += 返回值"] O --> P["i++"] P --> M end M -->|否| Q[/"返回 fragmentsRead"/] Q --> R([结束]) style A fill:#d4edda,stroke:#28a745,color:#1a1a1a style R fill:#f8d7da,stroke:#dc3545,color:#1a1a1a style I fill:#e8f0fe,stroke:#667eea,color:#1a1a1a style N fill:#e8f0fe,stroke:#667eea,color:#1a1a1a
Round-Robin 策略说明

通过 roundRobinIndex 变量,每次轮询从不同的 Image 开始,确保多个发布者的消息能够公平地被消费,避免某个 Image 的消息长期得不到处理的"饥饿"问题。

Image.controlledPoll

Image.controlledPoll 是消息读取的核心实现,它直接操作底层的 Term Buffer 来读取消息帧。

核心逻辑

源码位置:io.aeron.Image 第 406-484 行

Image.java
public int controlledPoll(
        final ControlledFragmentHandler handler,
        final int fragmentLimit) {

    // 如果 Image 已关闭,直接返回 0
    if (isClosed) {
        return 0;
    }

    // ==================== 初始化阶段 ====================
    int fragmentsRead = 0;

    // 获取当前订阅者位置(绝对字节偏移量)
    long initialPosition = subscriberPosition.get();

    // 计算在当前 Term 内的偏移量
    // termLengthMask = termLength - 1, 用于快速取模运算
    int initialOffset = (int) initialPosition & termLengthMask;
    int offset = initialOffset;

    // 获取当前活动的 Term Buffer
    final UnsafeBuffer termBuffer = activeTermBuffer(initialPosition);
    final int capacity = termBuffer.capacity();

    // 设置 Header 的 buffer 引用
    final Header header = this.header;
    header.buffer(termBuffer);

    // ==================== 消息读取循环 ====================
    try {
        // 循环条件:未达到限制 AND 未超出容量 AND 未关闭
        while (fragmentsRead < fragmentLimit && offset < capacity && !isClosed) {

            // 使用 volatile 语义读取帧长度
            final int length = frameLengthVolatile(termBuffer, offset);

            // 长度 <= 0 表示没有更多数据
            if (length <= 0) {
                break;
            }

            // 保存当前帧的起始偏移量
            final int frameOffset = offset;

            // 计算对齐后的帧长度(32 字节对齐)
            final int alignedLength = BitUtil.align(length, FRAME_ALIGNMENT);
            offset += alignedLength;

            // 跳过填充帧(Term 末尾的对齐填充)
            if (isPaddingFrame(termBuffer, frameOffset)) {
                continue;
            }

            // 增加已读计数
            ++fragmentsRead;

            // 设置 Header 的偏移量
            header.offset(frameOffset);

            // ==================== 调用处理器 ====================
            // 注意:传入的是消息体部分(跳过 32 字节头)
            final Action action = handler.onFragment(
                termBuffer,
                frameOffset + HEADER_LENGTH,    // 数据起始位置
                length - HEADER_LENGTH,          // 数据长度
                header
            );

            // ==================== Action 处理 ====================
            if (ABORT == action) {
                // ABORT: 回滚本次读取,不计入统计
                --fragmentsRead;
                offset -= alignedLength;
                break;
            }

            if (BREAK == action) {
                // BREAK: 保留进度,提前结束循环
                break;
            }

            if (COMMIT == action) {
                // COMMIT: 立即更新订阅者位置
                initialPosition += (offset - initialOffset);
                initialOffset = offset;
                if (!isClosed) {
                    subscriberPosition.setRelease(initialPosition);
                }
            }
            // CONTINUE: 继续循环,最后统一更新位置
        }
    } catch (final Exception ex) {
        // 异常处理:通知错误处理器
        errorHandler.onError(ex);
    } finally {
        // ==================== 最终位置更新 ====================
        final long resultingPosition = initialPosition + (offset - initialOffset);
        if (resultingPosition > initialPosition && !isClosed) {
            subscriberPosition.setRelease(resultingPosition);
        }
    }

    return fragmentsRead;
}

Action 处理机制

Action 状态流转图
stateDiagram-v2 [*] --> 处理消息: 读取到新消息 处理消息 --> ABORT: handler返回ABORT 处理消息 --> BREAK: handler返回BREAK 处理消息 --> COMMIT: handler返回COMMIT 处理消息 --> CONTINUE: handler返回CONTINUE ABORT --> 回滚状态: fragmentsRead--\noffset回滚 回滚状态 --> [*]: 退出循环 BREAK --> 保留进度: 保持当前统计 保留进度 --> [*]: 退出循环 COMMIT --> 立即提交: subscriberPosition\n.setRelease() 立即提交 --> 处理消息: 继续处理下一条 CONTINUE --> 处理消息: 继续处理下一条

Action 对比表

Action fragmentsRead offset subscriberPosition 继续循环
ABORT -1(不计入) 回滚 不更新
BREAK 保持 保持 finally 中更新
COMMIT 保持 保持 立即更新
CONTINUE 保持 保持 finally 中更新

ControlledFragmentHandler

ControlledFragmentHandler 是一个函数式接口,定义了受控消息处理的回调规范。

ControlledFragmentHandler.java
@FunctionalInterface
public interface ControlledFragmentHandler {

    /**
     * 处理器返回后要采取的动作
     */
    enum Action {
        // 中止当前轮询,不提交当前消息的位置
        ABORT,

        // 中断当前轮询,提交到当前消息为止的位置
        BREAK,

        // 立即提交位置,然后继续处理后续消息
        COMMIT,

        // 继续处理,最后统一提交位置
        CONTINUE
    }

    /**
     * 处理从日志中读取的数据片段的回调方法
     *
     * 注意:在此回调中禁止重入调用 Aeron 客户端,否则会导致未定义行为
     *
     * @param buffer 包含数据的缓冲区
     * @param offset 数据开始的偏移量
     * @param length 数据的字节长度
     * @param header 包含数据元信息的头
     * @return 处理完成后关于流位置应采取的动作
     */
    Action onFragment(
        DirectBuffer buffer,
        int offset,
        int length,
        Header header
    );
}

Action 枚举详解

ABORT

行为:中止当前轮询操作,不提交当前消息片段的位置。

使用场景:

  • 处理过程中发生错误,需要重试
  • 下游缓冲区已满,需要等待
  • 事务处理失败,需要回滚
BREAK

行为:中断当前轮询操作,提交包括当前消息在内的所有已处理消息的位置。

使用场景:

  • 已处理到某个边界点,需要暂停
  • 检测到需要特殊处理的消息
  • 时间片用完,需要让出 CPU
COMMIT

行为:立即提交当前位置,然后继续处理后续消息。

使用场景:

  • 重要消息处理完毕,需要立即确认
  • 事务边界,需要持久化进度
  • 流量控制点,需要及时释放窗口
CONTINUE

行为:继续处理后续消息,在轮询结束后统一提交位置。

使用场景:

  • 正常的消息处理流程
  • 批量处理场景
  • 对实时性要求不高的场景

ControlledFragmentAssembler

ControlledFragmentAssembler 是一个装饰器类,用于自动重组分片消息。当消息大于 MTU 时,Aeron 会将其分片发送,此类负责在接收端将这些分片重新组装成完整消息。

消息组装逻辑

消息分片处理流程图
flowchart TD A([onFragment 调用]) --> B{"检查 flags 标志位"} B -->|"UNFRAGMENTED\n(BEGIN + END)"| C["完整消息\n直接委托给 delegate"] B -->|"BEGIN_FRAG_FLAG"| D["分片开始\n创建 BufferBuilder"] B -->|"中间或结束片段"| E["继续组装"] C --> C1["调用 delegate.onFragment()"] C1 --> C2[/"返回 delegate 的 Action"/] D --> D1["builder.reset()"] D1 --> D2["builder.captureHeader()"] D2 --> D3["builder.append()"] D3 --> D4["记录 nextTermOffset"] D4 --> D5[/"返回 CONTINUE"/] E --> E1{"获取 BufferBuilder"} E1 -->|"不存在"| E2[/"返回 CONTINUE"/] E1 -->|"存在"| E3{"termOffset 连续?"} E3 -->|"否"| E4["builder.reset()\n丢弃已收集数据"] E4 --> E2 E3 -->|"是"| E5["builder.append()"] E5 --> E6{"END_FRAG_FLAG?"} E6 -->|"否"| E7["更新 nextTermOffset"] E7 --> E2 E6 -->|"是"| E8["消息组装完成"] E8 --> E9["调用 delegate.onFragment()"] E9 --> E10{"action == ABORT?"} E10 -->|"是"| E11["回滚 builder.limit()"] E10 -->|"否"| E12["builder.reset()"] E11 --> E13[/"返回 action"/] E12 --> E13 style A fill:#d4edda,stroke:#28a745,color:#1a1a1a style C fill:#e8f0fe,stroke:#667eea,color:#1a1a1a style D fill:#fff3cd,stroke:#ffc107,color:#1a1a1a style E8 fill:#d4edda,stroke:#28a745,color:#1a1a1a

分片消息标志位

消息标志位结构
flowchart LR subgraph FLAGS["Flags 字节"] direction LR B7["Bit7\nBEGIN"] B6["Bit6\nEND"] B5["Bit5"] B4["Bit4"] B3["Bit3"] B2["Bit2"] B1["Bit1"] B0["Bit0"] end subgraph 非分片消息 U["UNFRAGMENTED\n0b1100_0000\nBEGIN=1, END=1"] end subgraph 分片消息 F1["第一片\n0b1000_0000\nBEGIN=1, END=0"] F2["中间片\n0b0000_0000\nBEGIN=0, END=0"] F3["最后片\n0b0100_0000\nBEGIN=0, END=1"] end style B7 fill:#f8d7da,stroke:#dc3545,color:#1a1a1a style B6 fill:#d4edda,stroke:#28a745,color:#1a1a1a style U fill:#e8f0fe,stroke:#667eea,color:#1a1a1a style F1 fill:#fff3cd,stroke:#ffc107,color:#1a1a1a style F3 fill:#d4edda,stroke:#28a745,color:#1a1a1a
ControlledFragmentAssembler.java - onFragment()
public Action onFragment(
        final DirectBuffer buffer,
        final int offset,
        final int length,
        final Header header) {

    // 获取帧的标志位
    final byte flags = header.flags();
    Action action = Action.CONTINUE;

    // ==================== 情况1: 非分片消息 ====================
    // 当 BEGIN 和 END 标志都设置时,表示这是完整的消息
    if ((flags & UNFRAGMENTED) == UNFRAGMENTED) {
        // 直接委托给业务处理器,不需要组装
        action = delegate.onFragment(buffer, offset, length, header);
    }
    // ==================== 情况2: 分片消息的第一个片段 ====================
    else if ((flags & BEGIN_FRAG_FLAG) == BEGIN_FRAG_FLAG) {
        // 获取或创建该 session 的 BufferBuilder
        final BufferBuilder builder = getBufferBuilder(header.sessionId());

        // 重置并开始收集新的分片消息
        builder.reset()                               // 清空之前的数据
               .captureHeader(header)                 // 保存第一片的 Header
               .append(buffer, offset, length)       // 追加第一片数据
               .nextTermOffset(header.nextTermOffset()); // 记录下一片的预期位置
    }
    // ==================== 情况3: 中间片段或最后片段 ====================
    else {
        // 尝试获取已存在的 BufferBuilder
        final BufferBuilder builder = builderBySessionIdMap.get(header.sessionId());

        if (null != builder) {
            // 验证片段的连续性(防止乱序或丢失)
            if (header.termOffset() == builder.nextTermOffset()) {
                // 保存追加前的位置(用于 ABORT 回滚)
                final int limit = builder.limit();

                // 追加当前片段
                builder.append(buffer, offset, length);

                // 检查是否为最后一个片段
                if ((flags & END_FRAG_FLAG) == END_FRAG_FLAG) {
                    // ===== 消息组装完成,调用业务处理器 =====
                    action = delegate.onFragment(
                        builder.buffer(),         // 完整消息的缓冲区
                        0,                        // 从头开始
                        builder.limit(),          // 完整消息的长度
                        builder.completeHeader(header)  // 组合后的 Header
                    );

                    // 根据处理结果决定是否回滚
                    if (Action.ABORT == action) {
                        // ABORT: 回滚到追加前的状态,下次重新组装
                        builder.limit(limit);
                    } else {
                        // 其他 Action: 清空缓冲区,准备下一条消息
                        builder.reset();
                    }
                } else {
                    // 还不是最后片段,记录下一片的预期位置
                    builder.nextTermOffset(header.nextTermOffset());
                }
            } else {
                // 片段不连续(乱序或丢失),丢弃已收集的数据
                builder.reset();
            }
        }
    }

    return action;
}

辅助类详解

Header 类 - 帧头元数据

源码位置:io.aeron.logbuffer.Header

帧头结构布局(32 字节)
block-beta columns 8 block:row1:8 A["Frame Length\n(4 bytes)\n偏移: 0"] B["Version\n(1 byte)\n偏移: 4"] C["Flags\n(1 byte)\n偏移: 5"] D["Type\n(2 bytes)\n偏移: 6"] end block:row2:8 E["Term Offset\n(4 bytes)\n偏移: 8"] F["Session ID\n(4 bytes)\n偏移: 12"] G["Stream ID\n(4 bytes)\n偏移: 16"] H["Term ID\n(4 bytes)\n偏移: 20"] end block:row3:8 I["Reserved Value\n(8 bytes)\n偏移: 24-31"] end style A fill:#e8f0fe,stroke:#667eea,color:#1a1a1a style C fill:#f8d7da,stroke:#dc3545,color:#1a1a1a style F fill:#d4edda,stroke:#28a745,color:#1a1a1a style I fill:#fff3cd,stroke:#ffc107,color:#1a1a1a

Header 主要方法

方法 返回类型 说明
frameLength()int帧的总长度(包括头部 32 字节)
sessionId()int会话 ID,标识发布者
streamId()int流 ID
termId()intTerm ID,用于计算位置
termOffset()int在 Term 内的偏移量
flags()byte标志位(BEGIN/END)
position()long计算消息的绝对位置
reservedValue()long保留值(可用于自定义元数据)

LogBufferDescriptor - 日志缓冲区结构

源码位置:io.aeron.logbuffer.LogBufferDescriptor

Log Buffer 内存布局
flowchart TB subgraph LogBuffer["Log Buffer 总体结构"] direction TB T0["Term 0\n(termLength 字节)\n索引: 0"] T1["Term 1\n(termLength 字节)\n索引: 1"] T2["Term 2\n(termLength 字节)\n索引: 2"] META["Log Meta Data\n(4 KB)"] end T0 --> T1 T1 --> T2 T2 --> META style T0 fill:#e8f0fe,stroke:#667eea,color:#1a1a1a style T1 fill:#d4edda,stroke:#28a745,color:#1a1a1a style T2 fill:#fff3cd,stroke:#ffc107,color:#1a1a1a style META fill:#f8d7da,stroke:#dc3545,color:#1a1a1a
LogBufferDescriptor.java - 关键方法
/**
 * 根据位置计算 Term 索引(0, 1, 或 2)
 * 使用位运算实现高效的取模操作
 */
public static int indexByPosition(
        final long position,
        final int positionBitsToShift) {

    // position >>> positionBitsToShift 相当于 position / termLength
    // % PARTITION_COUNT 取模得到索引 (0, 1, 2)
    return (int) ((position >>> positionBitsToShift) % PARTITION_COUNT);
}

/**
 * 计算绝对位置
 * position = (termId - initialTermId) * termLength + termOffset
 */
public static long computePosition(
        final int activeTermId,
        final int termOffset,
        final int positionBitsToShift,
        final int initialTermId) {

    // 计算已经过的 Term 数量(支持 termId 溢出后的回绕)
    final long termCount = activeTermId - initialTermId;

    // termCount << positionBitsToShift 相当于 termCount * termLength
    return (termCount << positionBitsToShift) + termOffset;
}
positionBitsToShift 计算规则

该值由 termLength 决定,等于 log2(termLength):

  • termLength = 64KB → positionBitsToShift = 16
  • termLength = 1MB → positionBitsToShift = 20
  • termLength = 16MB → positionBitsToShift = 24

完整调用链

端到端调用时序图
sequenceDiagram autonumber participant App as 应用代码 participant Sub as Subscription participant Img as Image participant Asm as Assembler participant Hdl as 业务Handler participant Pos as Position App->>Sub: controlledPoll(handler, 10) loop 遍历每个 Image Sub->>Img: controlledPoll(handler, remaining) Img->>Pos: get() 获取当前位置 Pos-->>Img: initialPosition loop 读取消息帧 Note over Img: 读取帧长度 Note over Img: 检查是否填充帧 Img->>Asm: onFragment(buffer, offset, len, header) alt 完整消息 Asm->>Hdl: onFragment(完整消息) Hdl-->>Asm: Action else 分片开始 Note over Asm: 开始收集 else 分片结束 Asm->>Hdl: onFragment(组装后消息) Hdl-->>Asm: Action end Asm-->>Img: Action alt ABORT Note over Img: 回滚 offset else COMMIT Img->>Pos: setRelease(position) end end Img->>Pos: setRelease(finalPosition) Img-->>Sub: fragmentsRead end Sub-->>App: totalFragmentsRead

调用栈总结

Call Stack
Subscription.controlledPoll(handler, limit)
│
├── Image.controlledPoll(handler, limit)
│     │
│     ├── subscriberPosition.get()
│     │
│     ├── activeTermBuffer(position)
│     │     └── LogBufferDescriptor.indexByPosition()
│     │
│     └── [LOOP] 读取消息帧
│           │
│           ├── FrameDescriptor.frameLengthVolatile()
│           ├── BitUtil.align()
│           ├── FrameDescriptor.isPaddingFrame()
│           ├── Header.offset()
│           │
│           ├── handler.onFragment()
│           │     │
│           │     ├── [UNFRAGMENTED] delegate.onFragment()
│           │     │
│           │     └── [FRAGMENTED]
│           │           ├── BufferBuilder.reset()
│           │           ├── BufferBuilder.append()
│           │           └── [END] delegate.onFragment()
│           │
│           └── [处理 Action]
│                 ├── ABORT: 回滚 offset
│                 ├── BREAK: 退出循环
│                 ├── COMMIT: subscriberPosition.setRelease()
│                 └── CONTINUE: 继续
│
└── [FINALLY] subscriberPosition.setRelease()

使用示例

BasicUsage.java
// 创建订阅
Subscription subscription = aeron.addSubscription(
    "aeron:udp?endpoint=localhost:40123",
    streamId
);

// 定义消息处理器
ControlledFragmentHandler handler = (buffer, offset, length, header) -> {
    try {
        // 处理消息
        processMessage(buffer, offset, length);
        return Action.CONTINUE;
    } catch (Exception e) {
        // 处理失败,不提交位置,下次重试
        return Action.ABORT;
    }
};

// 轮询消息
while (running) {
    int fragmentsRead = subscription.controlledPoll(handler, 10);
    if (fragmentsRead == 0) {
        idleStrategy.idle();
    }
}

LargeMessageUsage.java
// 业务处理器(只接收完整消息)
ControlledFragmentHandler businessHandler = (buffer, offset, length, header) -> {
    System.out.println("收到完整消息,长度: " + length);
    return Action.CONTINUE;
};

// 使用 Assembler 包装,自动处理分片重组
ControlledFragmentAssembler assembler = new ControlledFragmentAssembler(
    businessHandler,
    1024 * 1024,  // 初始缓冲区大小: 1MB
    false          // 不使用 DirectByteBuffer
);

// 轮询时使用 assembler
subscription.controlledPoll(assembler, 10);

TransactionalUsage.java
final List<Message> batch = new ArrayList<>();
final int BATCH_SIZE = 100;

ControlledFragmentHandler transactionHandler = (buffer, offset, length, header) -> {
    Message msg = parseMessage(buffer, offset, length);
    batch.add(msg);

    // 达到批次大小时处理
    if (batch.size() >= BATCH_SIZE) {
        try {
            // 批量处理(例如写入数据库)
            processBatch(batch);
            batch.clear();
            // 批量处理成功,立即提交位置
            return Action.COMMIT;
        } catch (Exception e) {
            batch.clear();
            // 处理失败,中止并重试整个批次
            return Action.ABORT;
        }
    }

    // 继续收集批次
    return Action.CONTINUE;
};

BackpressureUsage.java
final BlockingQueue<Message> outputQueue =
    new LinkedBlockingQueue<>(1000);

ControlledFragmentHandler backpressureHandler = (buffer, offset, length, header) -> {
    Message msg = parseMessage(buffer, offset, length);

    // 检查下游队列容量
    if (outputQueue.remainingCapacity() == 0) {
        // 队列已满,暂停消费
        // ABORT 不会提交当前消息,下次轮询会重新读取
        return Action.ABORT;
    }

    // 队列有空间,放入消息
    outputQueue.offer(msg);
    return Action.CONTINUE;
};

最佳实践

推荐做法
  • 优先使用 CONTINUE - 获得最佳性能
  • 仅在事务边界使用 COMMIT - 减少 I/O 开销
  • 异常时返回 ABORT - 确保消息不丢失
  • 合理设置 fragmentLimit - 平衡延迟与吞吐
  • 复用 Assembler 实例 - 避免重复创建
  • 及时清理 BufferBuilder - 释放内存
避免做法
  • 避免在回调中阻塞 - 影响整体吞吐
  • 避免频繁 ABORT - 导致消息重复读取
  • 避免过大的 fragmentLimit - 延迟增加
  • 避免重入 Aeron 客户端 - 未定义行为
  • 避免忽略异常 - 正确处理所有错误
  • 避免持有 buffer 引用 - 数据会被覆盖

性能调优建议

场景 建议 fragmentLimit 推荐 Action 策略
低延迟交易系统 1 - 5 每条消息 COMMIT
高吞吐量数据管道 100 - 256 批量 CONTINUE,最后 COMMIT
日志收集 50 - 100 CONTINUE(依赖 finally 提交)
事务处理 10 - 50 事务边界 COMMIT,失败 ABORT

总结

三层架构
Subscription → Image → Handler
层层委托,职责分明
四种 Action
ABORT / BREAK / COMMIT / CONTINUE
精确控制消息消费行为
分片重组
ControlledFragmentAssembler
透明处理大消息分片
文档版本信息

基于 Aeron 版本:1.48.x

分析的核心文件:

io.aeron.Subscription | io.aeron.Image | io.aeron.ControlledFragmentAssembler | io.aeron.logbuffer.ControlledFragmentHandler | io.aeron.logbuffer.Header | io.aeron.logbuffer.FrameDescriptor | io.aeron.logbuffer.LogBufferDescriptor