Aeron Publication.offer() 返回值完全指南
深入理解Aeron消息发布的每一种返回状态及其处理策略
📖 前言
在使用 Aeron 进行高性能消息传输时,Publication.offer() 方法是发送消息的核心接口。该方法返回一个 long 类型的值,
当返回值 ≥ 0 时表示发送成功(返回值是消息在流中的位置),当返回值 < 0 时表示发送失败或需要重试。
理解这些负数返回值的含义对于构建可靠的、高性能的消息系统至关重要。本文档详细解析了 Aeron Publication 的 5 种负数返回值, 并提供了生产级的处理策略和代码示例。
适用场景
- 撮合引擎:订单事件发布到 Aeron Archive
- 市场数据服务:深度推送到 Aeron IPC channel
- 分布式系统:Aeron Cluster 节点间通信
- 高频交易系统:低延迟消息传输
🎯 返回值概览
| 返回值常量 | 数值 | 含义 | 严重程度 | 是否可重试 |
|---|---|---|---|---|
NOT_CONNECTED |
-1 | Publication 没有订阅者 | 临时 | 是 |
BACK_PRESSURED |
-2 | 订阅者处理速度跟不上 | 注意 | 是 |
ADMIN_ACTION |
-3 | 内部维护操作进行中 | 瞬态 | 立即 |
CLOSED |
-4 | Publication 已关闭 | 严重 | 否 |
MAX_POSITION_EXCEEDED |
-5 | 流位置达到上限 | 严重 | 否 |
重要提示
在生产环境中,ADMIN_ACTION 和 BACK_PRESSURED 是最常见的返回值,需要正确处理才能保证系统的可靠性。 CLOSED 和 MAX_POSITION_EXCEEDED 表示不可恢复的错误,必须采取特殊处理措施。
🔵 NOT_CONNECTED (-1)
含义
NOT_CONNECTED 表示当前 Publication 没有任何订阅者(Subscriber)连接。这通常是一个临时状态,
发生在:
- 订阅者尚未启动
- 订阅者正在重新连接
- 网络短暂中断
- 订阅者进程崩溃后正在恢复
处理策略
策略: 等待订阅者连接,使用 Thread.yield() 让出 CPU,然后重试。
代码示例
long result = publication.offer(buffer, 0, length);
if (result == Publication.NOT_CONNECTED) {
// 没有订阅者,让出CPU并重试
Thread.yield();
// 或者短暂睡眠
// Thread.sleep(1);
}
// 典型场景:订阅者启动需要时间
while (publication.offer(buffer, 0, length) == Publication.NOT_CONNECTED) {
System.out.println("等待订阅者连接...");
Thread.yield();
}
生产环境注意事项
- 设置超时:不能无限等待,应设置最大重试次数或超时时间
- 监控告警:如果长时间(如 >1 秒)处于 NOT_CONNECTED 状态,应触发告警
- 区分场景:
- IPC channel:订阅者在同一进程内,NOT_CONNECTED 应该很快恢复
- UDP/Network channel:可能需要更长的等待时间
🟡 BACK_PRESSURED (-2)
含义
BACK_PRESSURED 表示订阅者的处理速度跟不上发布者的发送速度,导致流量控制(Flow Control)触发。
Aeron 使用环形缓冲区(Term Buffer),当订阅者消费速度慢时,缓冲区会被填满,此时发布者必须等待。
发生原因
- 订阅者处理逻辑过重:例如订阅者需要将消息写入数据库
- 订阅者 GC 停顿:JVM Full GC 导致订阅者暂停消费
- 网络延迟:UDP 网络出现丢包或延迟
- 发布速率过高:发布者发送速度超过系统设计容量
处理策略
策略: 使用 Aeron 提供的 IdleStrategy 进行退避(Backoff),避免空转消耗 CPU。
常用策略:BackoffIdleStrategy、BusySpinIdleStrategy、YieldingIdleStrategy
代码示例
// 使用 BackoffIdleStrategy 进行退避
IdleStrategy idleStrategy = new BackoffIdleStrategy(
100, // maxSpins
10, // maxYields
1, // minParkPeriodNs
1000_000 // maxParkPeriodNs (1ms)
);
long result = publication.offer(buffer, 0, length);
if (result == Publication.BACK_PRESSURED) {
// 订阅者跟不上,使用退避策略
idleStrategy.idle();
}
// 完整的重试循环
long deadline = System.nanoTime() + timeoutNs;
long result;
while ((result = publication.offer(buffer, 0, length)) == Publication.BACK_PRESSURED) {
if (System.nanoTime() > deadline) {
throw new TimeoutException("发送超时");
}
idleStrategy.idle();
}
idleStrategy.reset(); // 重置策略状态
BackoffIdleStrategy 行为
graph LR
Start[开始重试] --> Spin{尝试次数
< maxSpins?}
Spin -->|是| SpinWait[空循环等待
不让出CPU]
Spin -->|否| Yield{尝试次数
< maxYields?}
Yield -->|是| YieldCPU[Thread.yield
让出CPU]
Yield -->|否| Park[LockSupport.park
线程休眠]
SpinWait --> Retry[重试offer]
YieldCPU --> Retry
Park --> Retry
Retry --> Success{成功?}
Success -->|是| End[结束]
Success -->|否| Spin
style SpinWait fill:#d1e7dd
style YieldCPU fill:#fff3cd
style Park fill:#f8d7da
生产环境注意事项
- 监控 BACK_PRESSURED 频率:如果频繁出现,说明系统设计存在瓶颈
- 优化订阅者性能:
- 使用批量处理(FragmentAssembler)
- 异步写入数据库(批量提交)
- 避免在订阅者线程中执行重 IO 操作
- 调整 Term Buffer 大小:增大
term-buffer-length可以提供更大的缓冲空间 - 流量控制策略:考虑使用
MinMulticastFlowControl或MaxMulticastFlowControl
🟠 ADMIN_ACTION (-3)
含义
ADMIN_ACTION 表示 Aeron 内部正在执行管理维护操作,这是一个极短暂的瞬态状态(通常 <1 毫秒)。
这些操作是 Aeron 正常运行所必需的,发布者应该立即重试,无需等待。
关键特点
- 持续时间:5-500 微秒(通常 <100 微秒)
- 频率:取决于 Term Buffer 大小和消息发送速率
- 处理:立即重试,无需休眠或退避
1. Term Buffer 日志轮转(Log Rotation)
Aeron 使用三个 Term Buffer 组成环形缓冲区,当一个 Term Buffer 写满时,需要切换到下一个 Term Buffer。
Term Buffer 结构
Term Buffer 0 (1GB) → 写满 → 切换
Term Buffer 1 (1GB) → 写满 → 切换
Term Buffer 2 (1GB) → 写满 → 回到 Term Buffer 0
每个 Term Buffer 的大小由 term-buffer-length 配置决定
默认值:16MB - 1GB(必须是 2 的幂次)
日志轮转流程
- 检测当前 Term Buffer 已满
- 切换到下一个 Term Buffer
- 更新 active term id
- 更新 tail position
- 重试消息发送
持续时间: 5-50 微秒
2. Aeron Archive Recording 分段切换
当使用 Aeron Archive 进行持久化时,Recording 文件会按照固定大小分段存储。当一个 Segment 文件写满时,需要创建新的 Segment 文件。
Archive Segment 结构
/archive-dir/
├── recording-100001-0-0.rec ← Segment 0
├── recording-100001-0-134217728.rec ← Segment 1 (128MB 后)
├── recording-100001-0-268435456.rec ← Segment 2 (256MB 后)
└── recording-100001-0-402653184.rec ← Segment 3 (384MB 后)
Segment 大小由 segment-file-length 配置决定
默认值:128MB
Segment 切换流程
- 检测当前 Segment 文件已满
- 关闭当前 Segment 文件描述符
- 创建新的 Segment 文件
- 打开新文件描述符并初始化
- 更新 Archive Catalog 元数据
- 重试消息写入
持续时间: 100-500 微秒(涉及文件系统操作)
3. 缓冲区清理与内存管理
Aeron 在运行过程中会执行一些内存管理操作,例如清理已消费的消息、回收不再使用的内存。
缓冲区清理操作
- 已消费数据清理: 当所有订阅者都已消费某段数据后,该段内存可以被重用
- 过期订阅者清理: 移除长时间无活动的订阅者
- 心跳检查: Media Driver 执行心跳检测,更新活跃状态
- 统计信息更新: 更新计数器和性能指标
持续时间: 5-20 微秒
4. ADMIN_ACTION 持续时间分析
| 操作类型 | 典型持续时间 | 最大持续时间 | 频率 |
|---|---|---|---|
| Term Buffer 轮转 | 5-50 μs | 100 μs | 每填满 1 个 Term Buffer(如 1GB) |
| Archive Segment 切换 | 100-500 μs | 1 ms | 每填满 1 个 Segment(如 128MB) |
| 缓冲区清理 | 5-20 μs | 50 μs | 周期性(如每 1ms) |
| Media Driver 维护 | 10-30 μs | 100 μs | 周期性(如每 10ms) |
处理策略
策略: 立即重试,无需任何等待或退避。ADMIN_ACTION 持续时间极短,直接循环重试即可。
代码示例
long result = publication.offer(buffer, 0, length);
if (result == Publication.ADMIN_ACTION) {
// 立即重试,无需等待
result = publication.offer(buffer, 0, length);
}
// 或者在循环中直接 continue
while (true) {
long result = publication.offer(buffer, 0, length);
if (result >= 0) {
break; // 发送成功
}
switch ((int) result) {
case (int) Publication.ADMIN_ACTION:
continue; // 立即重试,不执行任何等待
case (int) Publication.BACK_PRESSURED:
idleStrategy.idle();
break;
// ... 其他情况
}
}
性能影响
- 对延迟的影响: 通常增加 <1 微秒延迟,可以忽略不计
- 对吞吐量的影响: 几乎无影响,因为操作极其快速
- 重试次数: 通常只需重试 1-2 次即可成功
- 监控指标: 如果 ADMIN_ACTION 频繁出现且重试多次才成功,可能需要调优配置
🔴 CLOSED (-4)
含义
CLOSED 表示 Publication 已经被关闭,这是一个不可恢复的错误状态。一旦 Publication 被关闭,就无法再发送消息。
发生原因
- 显式调用了
publication.close() - Aeron Media Driver 停止运行
- Aeron Client 上下文(Aeron context)被关闭
- 程序正在关闭(JVM shutdown)
处理策略
策略: 抛出异常,停止发送。不应重试,必须重新创建 Publication。
如果需要继续发送消息,必须重新创建 Publication 实例。
代码示例
long result = publication.offer(buffer, 0, length);
if (result == Publication.CLOSED) {
throw new IllegalStateException(
"Publication 已关闭,无法发送消息。" +
"Channel: " + publication.channel() +
", StreamId: " + publication.streamId()
);
}
// 在生产环境中,可以尝试重新创建 Publication
if (result == Publication.CLOSED) {
logger.error("Publication 已关闭,尝试重新创建");
// 关闭旧的 Publication(可能已经关闭)
try {
publication.close();
} catch (Exception e) {
// 忽略异常
}
// 重新创建 Publication
publication = aeron.addExclusivePublication(channel, streamId);
// 等待 Publication 连接
while (!publication.isConnected()) {
Thread.yield();
}
// 重试发送
result = publication.offer(buffer, 0, length);
}
生产环境注意事项
- 监控告警: CLOSED 错误应触发高优先级告警
- 优雅关闭: 在关闭应用时,应先停止发送消息,再关闭 Publication
- 资源管理: 使用 try-with-resources 或在 finally 块中关闭 Publication
- 重连机制: 如果需要自动恢复,应实现 Publication 的重连逻辑
⚫ MAX_POSITION_EXCEEDED (-5)
含义
MAX_POSITION_EXCEEDED 表示流的位置(stream position)已经达到最大值(2^61 - 1 字节),无法继续发送消息。
这是一个极其罕见的错误,在正常情况下几乎不会发生。
位置限制计算
最大位置:2^61 - 1 = 2,305,843,009,213,693,951 字节
≈ 2.3 EB (Exabytes)
≈ 2,305,843 TB (Terabytes)
假设消息发送速率为 1 GB/s(极高速率):
达到上限所需时间 ≈ 73 年
假设消息发送速率为 100 MB/s(常见速率):
达到上限所需时间 ≈ 730 年
发生原因
- 流长期运行且从未重启(几十年)
- Position 计数器溢出(理论情况)
- Bug 导致 Position 计算错误
处理策略
策略: 创建新的 Stream(使用新的 streamId),然后将订阅者迁移到新流。
代码示例
long result = publication.offer(buffer, 0, length);
if (result == Publication.MAX_POSITION_EXCEEDED) {
logger.error("流位置达到上限!当前 streamId: {}", publication.streamId());
// 1. 通知订阅者切换到新流
notifySubscribersToSwitchStream(newStreamId);
// 2. 创建新的 Publication(使用新的 streamId)
int newStreamId = publication.streamId() + 1;
Publication newPublication = aeron.addExclusivePublication(
publication.channel(),
newStreamId
);
// 3. 等待新 Publication 连接
while (!newPublication.isConnected()) {
Thread.yield();
}
// 4. 切换到新 Publication
Publication oldPublication = publication;
publication = newPublication;
oldPublication.close();
// 5. 发送消息到新流
result = publication.offer(buffer, 0, length);
}
生产环境注意事项
- 预防措施: 定期重启服务,避免流位置无限增长
- 监控 Position: 监控流位置,当接近上限时提前采取措施
- 流管理策略: 可以设计流的轮转机制,例如每天或每周切换到新流
- 订阅者协调: 确保所有订阅者能够平滑切换到新流
📊 处理决策流程
graph TB
Start[调用 publication.offer] --> Check{返回值判断}
Check -->|≥ 0| Success[发送成功
返回位置]
Check -->|-1
NOT_CONNECTED| NC[没有订阅者]
NC --> NC_Action[Thread.yield
然后重试]
NC_Action --> Start
Check -->|-2
BACK_PRESSURED| BP[流量控制触发]
BP --> BP_Action[IdleStrategy.idle
退避等待]
BP_Action --> Start
Check -->|-3
ADMIN_ACTION| AA[内部维护操作]
AA --> AA_Action[立即重试
无需等待]
AA_Action --> Start
Check -->|-4
CLOSED| Closed[Publication已关闭]
Closed --> Closed_Action[抛出异常
或重新创建]
Check -->|-5
MAX_POSITION| MaxPos[位置达到上限]
MaxPos --> MaxPos_Action[创建新Stream
迁移订阅者]
Success --> End[完成]
Closed_Action --> End
MaxPos_Action --> End
style Success fill:#d1e7dd
style NC fill:#cfe2ff
style BP fill:#fff3cd
style AA fill:#ffe5d0
style Closed fill:#f8d7da
style MaxPos fill:#212529,color:#fff
💎 生产级 ReliablePublisher 实现
以下是一个生产环境可用的可靠消息发布器实现,正确处理了所有返回值情况,并提供了超时、重试、监控等功能。
import io.aeron.Publication;
import io.aeron.logbuffer.BufferClaim;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.BackoffIdleStrategy;
import org.agrona.concurrent.IdleStrategy;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
/**
* 生产级可靠消息发布器
*
* 特性:
* 1. 正确处理所有 Publication.offer() 返回值
* 2. 超时机制,避免无限等待
* 3. 退避策略,优化 CPU 使用
* 4. 性能指标收集
* 5. 自动重连机制(可选)
*/
public class ReliablePublisher {
private final Publication publication;
private final IdleStrategy idleStrategy;
private final long timeoutNs;
// 性能指标
private final AtomicLong successCount = new AtomicLong(0);
private final AtomicLong backPressuredCount = new AtomicLong(0);
private final AtomicLong adminActionCount = new AtomicLong(0);
private final AtomicLong notConnectedCount = new AtomicLong(0);
public ReliablePublisher(Publication publication, long timeoutMs) {
this.publication = publication;
this.timeoutNs = TimeUnit.MILLISECONDS.toNanos(timeoutMs);
this.idleStrategy = new BackoffIdleStrategy(
100, // maxSpins
10, // maxYields
1, // minParkPeriodNs
1000_000 // maxParkPeriodNs (1ms)
);
}
/**
* 可靠地发送消息,处理所有错误情况
*
* @param buffer 消息缓冲区
* @param offset 起始偏移量
* @param length 消息长度
* @return 消息在流中的位置
* @throws TimeoutException 如果超时
* @throws IllegalStateException 如果 Publication 已关闭
*/
public long offer(DirectBuffer buffer, int offset, int length)
throws TimeoutException {
long deadline = System.nanoTime() + timeoutNs;
long result;
while (true) {
result = publication.offer(buffer, offset, length);
// 发送成功
if (result >= 0) {
successCount.incrementAndGet();
idleStrategy.reset();
return result;
}
// 检查超时
if (System.nanoTime() > deadline) {
throw new TimeoutException(
String.format("发送超时,最后返回值: %d", result)
);
}
// 根据返回值采取不同策略
switch ((int) result) {
case (int) Publication.ADMIN_ACTION:
// 立即重试,不等待
adminActionCount.incrementAndGet();
continue;
case (int) Publication.BACK_PRESSURED:
// 订阅者跟不上,使用退避策略
backPressuredCount.incrementAndGet();
idleStrategy.idle();
break;
case (int) Publication.NOT_CONNECTED:
// 没有订阅者,让出 CPU
notConnectedCount.incrementAndGet();
Thread.yield();
break;
case (int) Publication.CLOSED:
throw new IllegalStateException(
"Publication 已关闭,无法发送消息。" +
"Channel: " + publication.channel() +
", StreamId: " + publication.streamId()
);
case (int) Publication.MAX_POSITION_EXCEEDED:
throw new IllegalStateException(
"流位置达到上限,需要创建新流。" +
"当前 StreamId: " + publication.streamId()
);
default:
throw new IllegalStateException(
"未知的返回值: " + result
);
}
}
}
/**
* 使用 BufferClaim 零拷贝方式发送消息
*/
public long tryClaim(int length, BufferClaim bufferClaim)
throws TimeoutException {
long deadline = System.nanoTime() + timeoutNs;
long result;
while (true) {
result = publication.tryClaim(length, bufferClaim);
if (result >= 0) {
successCount.incrementAndGet();
idleStrategy.reset();
return result;
}
if (System.nanoTime() > deadline) {
throw new TimeoutException(
String.format("Claim 超时,最后返回值: %d", result)
);
}
switch ((int) result) {
case (int) Publication.ADMIN_ACTION:
adminActionCount.incrementAndGet();
continue;
case (int) Publication.BACK_PRESSURED:
backPressuredCount.incrementAndGet();
idleStrategy.idle();
break;
case (int) Publication.NOT_CONNECTED:
notConnectedCount.incrementAndGet();
Thread.yield();
break;
case (int) Publication.CLOSED:
throw new IllegalStateException("Publication 已关闭");
case (int) Publication.MAX_POSITION_EXCEEDED:
throw new IllegalStateException("流位置达到上限");
}
}
}
/**
* 获取性能指标
*/
public PublisherMetrics getMetrics() {
return new PublisherMetrics(
successCount.get(),
backPressuredCount.get(),
adminActionCount.get(),
notConnectedCount.get()
);
}
/**
* 重置性能指标
*/
public void resetMetrics() {
successCount.set(0);
backPressuredCount.set(0);
adminActionCount.set(0);
notConnectedCount.set(0);
}
/**
* 性能指标类
*/
public static class PublisherMetrics {
public final long successCount;
public final long backPressuredCount;
public final long adminActionCount;
public final long notConnectedCount;
public PublisherMetrics(long successCount, long backPressuredCount,
long adminActionCount, long notConnectedCount) {
this.successCount = successCount;
this.backPressuredCount = backPressuredCount;
this.adminActionCount = adminActionCount;
this.notConnectedCount = notConnectedCount;
}
public long getTotalRetries() {
return backPressuredCount + adminActionCount + notConnectedCount;
}
public double getRetryRate() {
if (successCount == 0) return 0.0;
return (double) getTotalRetries() / successCount;
}
@Override
public String toString() {
return String.format(
"PublisherMetrics{success=%d, backPressured=%d, " +
"adminAction=%d, notConnected=%d, retryRate=%.2f%%}",
successCount, backPressuredCount, adminActionCount,
notConnectedCount, getRetryRate() * 100
);
}
}
}
// 使用示例
public class MatchingEngineExample {
private final ReliablePublisher publisher;
public void publishMatchingEvent(DirectBuffer buffer, int offset, int length) {
try {
long position = publisher.offer(buffer, offset, length);
System.out.println("事件发送成功,位置: " + position);
} catch (TimeoutException e) {
System.err.println("发送超时: " + e.getMessage());
// 记录日志,触发告警
} catch (IllegalStateException e) {
System.err.println("Publication 错误: " + e.getMessage());
// 尝试重新创建 Publication
}
}
public void printMetrics() {
ReliablePublisher.PublisherMetrics metrics = publisher.getMetrics();
System.out.println(metrics);
// 如果重试率过高,触发告警
if (metrics.getRetryRate() > 0.1) { // 超过 10%
System.err.println("警告:重试率过高,可能存在性能瓶颈");
}
}
}
📋 返回值对比总结
| 返回值 | 数值 | 含义 | 处理策略 | 等待时间 | 严重程度 |
|---|---|---|---|---|---|
NOT_CONNECTED |
-1 | 没有订阅者 | Thread.yield() + 重试 |
1-10 ms | 低 |
BACK_PRESSURED |
-2 | 流量控制 | IdleStrategy.idle() + 重试 |
动态退避 | 中 |
ADMIN_ACTION |
-3 | 内部维护 | 立即重试,无需等待 | 0 ms | 低 |
CLOSED |
-4 | Publication 关闭 | 抛出异常或重建 | N/A | 高 |
MAX_POSITION_EXCEEDED |
-5 | 位置达上限 | 创建新流,迁移订阅者 | N/A | 高 |
关键要点
- ADMIN_ACTION:最容易被误解,实际上应该立即重试,无需等待
- BACK_PRESSURED:最常见的重试场景,必须使用IdleStrategy避免 CPU 空转
- NOT_CONNECTED:临时状态,通常很快恢复,使用
Thread.yield()即可 - CLOSED 和 MAX_POSITION_EXCEEDED:不可恢复错误,需要特殊处理
📈 监控与指标
关键监控指标
| 指标名称 | 含义 | 正常范围 | 告警阈值 |
|---|---|---|---|
| 成功发送率 | 成功发送的消息占比 | >99.9% | <99% |
| BACK_PRESSURED 频率 | 触发流量控制的次数 | <1% | >5% |
| ADMIN_ACTION 频率 | 内部维护操作次数 | 取决于 Term Buffer 大小 | N/A(正常行为) |
| NOT_CONNECTED 持续时间 | 没有订阅者的时间 | <100 ms | >1 秒 |
| 平均重试次数 | 每条消息的平均重试次数 | <0.1 | >1 |
| 发送延迟 P99 | 99% 消息的发送延迟 | <1 ms | >10 ms |
监控实现示例
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
public class MonitoredPublisher {
private final ReliablePublisher publisher;
private final Counter successCounter;
private final Counter backPressuredCounter;
private final Counter adminActionCounter;
private final Counter notConnectedCounter;
private final Timer offerTimer;
public MonitoredPublisher(ReliablePublisher publisher,
MeterRegistry registry) {
this.publisher = publisher;
// 注册指标
this.successCounter = Counter.builder("aeron.publish.success")
.description("成功发送的消息数")
.register(registry);
this.backPressuredCounter = Counter.builder("aeron.publish.back_pressured")
.description("触发流量控制的次数")
.register(registry);
this.adminActionCounter = Counter.builder("aeron.publish.admin_action")
.description("内部维护操作次数")
.register(registry);
this.notConnectedCounter = Counter.builder("aeron.publish.not_connected")
.description("没有订阅者的次数")
.register(registry);
this.offerTimer = Timer.builder("aeron.publish.latency")
.description("发送延迟")
.register(registry);
}
public long offer(DirectBuffer buffer, int offset, int length)
throws TimeoutException {
return offerTimer.recordCallable(() -> {
long result = publisher.offer(buffer, offset, length);
// 更新指标
successCounter.increment();
// 从 publisher 获取详细指标
ReliablePublisher.PublisherMetrics metrics = publisher.getMetrics();
backPressuredCounter.increment(metrics.backPressuredCount);
adminActionCounter.increment(metrics.adminActionCount);
notConnectedCounter.increment(metrics.notConnectedCount);
publisher.resetMetrics();
return result;
});
}
}
// Grafana 告警规则示例(PromQL)
// 告警:BACK_PRESSURED 频率过高
rate(aeron_publish_back_pressured_total[1m]) / rate(aeron_publish_success_total[1m]) > 0.05
// 告警:发送延迟 P99 过高
histogram_quantile(0.99, rate(aeron_publish_latency_bucket[1m])) > 0.010
✅ 最佳实践总结
✅ 推荐做法
- 使用
ReliablePublisher封装重试逻辑 - 为所有重试操作设置超时时间
- ADMIN_ACTION 立即重试,无需等待
- BACK_PRESSURED 使用 IdleStrategy 退避
- 监控重试频率和发送延迟
- 使用 BufferClaim 进行零拷贝发送
- 定期收集和分析性能指标
- 在单元测试中模拟所有返回值场景
❌ 避免的做法
- 忽略负数返回值,假设一定成功
- 对 ADMIN_ACTION 使用 sleep 等待
- 无限重试,不设置超时机制
- 在 BACK_PRESSURED 时使用忙等待(busy-wait)
- 忽略 CLOSED 错误继续发送
- 不监控重试频率和失败率
- 在高频路径上使用异常捕获
- 混淆不同返回值的处理策略
性能优化建议
- Term Buffer 大小:根据消息大小和发送速率调整,通常 64MB-1GB
- IdleStrategy 选择:
- 低延迟场景:BusySpinIdleStrategy
- 平衡场景:BackoffIdleStrategy
- 低 CPU 占用:YieldingIdleStrategy
- 批量发送:积累多条消息后批量发送,提高吞吐量
- 零拷贝:优先使用 tryClaim() + BufferClaim
- 预热:系统启动后发送测试消息,预热 JIT 和缓存