面试问题

1. liteflow 流程编排

LiteFlow 是一个轻量级、快速、稳定、可编排的组件式流程引擎。其核心思想是将复杂的业务流程分解为独立的、可复用的组件(Node),然后通过一个规则文件(通常是 XML 或 EL 表达式)来灵活地组装和编排这些组件的执行顺序和逻辑

  1. 初始化:加载
    1. 组件扫描与注册:应用启动时,LiteFlow 通过 Spring 的自动扫描或手动配置的方式,发现所有实现了组件接口的类。
    2. 规则解析与编译:解析器解析为抽象的语法元素,编译器将语法元素转换并组装成具体的执行链,生成复杂拓扑关系,及有向无环图 DAG。
  2. 运行时:FlowExecutor.execute2Resp
    1. 获取执行链:执行器根据传入的链 ID(如 "chain1")从内存中获取到预先编译好的执行链。
    2. 创建执行槽 (Slot) 和 数据上下文 (Context)Slot 为每一次流程执行创建一个唯一的数据槽,全局并且安全的存储空间,存储框架本身的执行元数据;Context 业务是数据上下文。
    3. 构建执行器并遍历执行
      1. 框架会创建一个执行器 (NodeExecutor),它负责遍历和执行执行链中的每一个节点。
      2. 执行器从执行链的起点开始,按图索骥。
    4. 响应与清理
      1. 整个链执行完毕后,执行器将结果封装返回给调用方。
      2. 清理本次执行在 Slot 中产生的临时数据,确保不会污染下一次执行。

原子组件:

创建订单:

2. Apollo 配置中心

虽然 apollo 核心机制是cp,但通过以下设计缓解了强一致性对可用性的冲击。

  1. 客户端本地缓存:自动降级使用本地缓存。
  2. 配置灰度发布:支持分批发布配置,即使部分节点故障,未发布的节点仍可用。
  3. 长轮询降级:配置更新推送失败时,客户端回退为定时轮询(5分钟),而非直接中断服务。

3. redis

1. Redisson

底层采用 hash 结构,公平锁还有 list 拥有存放等待队列,zset 用于存放锁超时时间

货主创建订单时对货主ID进行加锁操作。

非公平锁加锁

  1. #加锁
    hset order:create:货主ID 客户端唯一标识[UUID + 线程ID] 1
    
    #可重入
    hincrby order:create:货主ID 客户端唯一标识[UUID + 线程ID] 1
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24

    2. **看门狗机制**:如果客户端没有显式指定 `leaseTime`(锁持有时间),Redisson 会启动一个**看门狗线程**。

    - 加锁成功后,看门狗会以**锁超时时间的 1/3** 为周期(默认超时30秒,则每10秒一次)定时续期。
    - 锁的过期时间也会设置为 30秒。
    - 它通过不断重置锁的过期时间,来保证只要客户端节点还“活着”,业务逻辑还在执行,锁就不会因为超时而被意外释放,从而避免了死锁的发生。

    3. **锁等待与订阅**:如果第一步的 Lua 脚本返回了数字(即锁的剩余存活时间),表示加锁失败。

    1. 当前线程会**订阅** Redis 的一个特定 Channel(格式:`redisson_lock__channel:{lockName}`)。
    2. 然后使用一个循环,在 `while(true)` 中不断地尝试获取锁。
    3. 一旦锁被释放,持有锁的客户端会**发布(PUBLISH)** 一条消息到这个 Channel。订阅者收到消息后,会被唤醒并立即进行下一次抢锁尝试,而不是等待固定的时间间隔后再重试。这大大减少了锁释放到被获取之间的延迟。

    **非公平锁释放锁**:

    1. ```lua
    #重入减少
    hincrby order:create:货主ID 客户端唯一标识[UUID + 线程ID] -1

    #释放锁
    del order:create:货主ID

    #发布锁已释放消息
    publish redisson_lock__channel:{lockName} 释放锁的消息
  2. 释放锁成功后,会取消看门狗的续期任务。

2. Redis 的 lua 为啥可以保证原子性

Redis 是单线程执行命令的,而 Lua 脚本在执行时会被当作一个独立的、不可中断的“超级命令”来执行

  1. 单线程事件循环:Redis 采用了单线程(主线程)来处理所有的网络 I/O 和键值对操作。这意味着在任何给定的时刻,只有一个命令(或一个 Lua 脚本)在被执行。
  2. 在脚本执行期间,Redis 不会去处理任何其他客户端的命令请求。脚本中的所有 Redis 操作(如 redis.call('set', ...), redis.call('get', ...) 等)都会被连续、不间断地执行完毕。
  3. 只有当整个脚本的逻辑全部执行完成(或者遇到错误中止)后,Redis 才会去处理队列中的下一个命令。

抢单操作:

  1. 生成抢单调度单时将抢单调度单号缓存;

    1
    set grab:调度单号 1
  2. 司机抢单时,采用 lua 脚本,删除 redis key ;

    1
    del grab:调度单号
  3. lua脚本 del 成功后使用redis 缓存;

    1
    zadd drivers:grabs 抢单时间 调度单号:司机ID
  4. 抢单成功后更新调度单的配送司机为抢单司机,标记调度单进入货主待确认状态,或进入配送状态;

  5. 后台定时任务进行对账,避免数据库操作失败。

    1
    zrange drivers:grabs 0 100
  6. 如果 redis 和数据库不一致,则更新数据库,并删除redis。

    1
    zrem drivers:grabs 调度单号:司机ID ...

3. 延时双删 - mysql与redis数据一致性问题

由于 查询订单时,一般会查询对应的订单配送节点,所以会进行连表查询,为了避免这样的大量连表操作直接打到数据库造成数据库压力过大,引入的 redis 缓存订单查询信息,而 redis 是基于内存的数据库,相比于 mysql 基于磁盘拥有更高的效率和吞吐量,固引入redis 缓存订单信息,并且查询订单信息的dubo接口调用量也较大,另一方面也加速的数据查询效率,提升用户体验。

  1. 针对 redis 和 mysql 数据一致性问题,避免 redis 缓存存在脏数据的情况;
  2. 先删除缓存,更新数据库,订单号作为缓存键;
  3. 为了保证 redis 删除成功,采用双删除策略;
    • 发送MQ延时消息,延时时间设置为 5秒,level = 2;
    • 使用 ScheduledExecutorService 1秒后执行删除;

4. CompletableFuture 基本原理

CompletableFuture 的核心思想是基于任务的协作和链式调用,而非传统的线程间直接协作(如 Future.get() 的阻塞)。

CompletionStage: 定义了异步计算过程中的“阶段”(Stage)概念。一个阶段代表一个异步操作,它可以由另一个阶段触发,也可以在完成后触发一系列后续阶段。这才是 CompletableFuture 强大功能的源泉。

1. 状态与结果存储

CompletableFuture 内部使用一个 volatileObject 类型的变量(在最新JDK中是一个 volatile Object result)来存储两种东西:

  • 最终的计算结果 (正常完成)
  • 遇到的异常 (异常完成)

同时,它使用一个 volatile int 变量来记录状态(如 NEW, COMPLETING, NORMAL, EXCEPTIONAL, CANCELLED 等)。通过状态和 result 字段,可以判断当前阶段是否完成、是正常完成还是异常完成。

2. 依赖与链式调用(核心中的核心)

这是 CompletableFuture 最复杂也最精妙的部分。当一个阶段(A)尚未完成时,你为其添加了一个后续动作(B,形成阶段B),这个依赖关系是如何存储和触发的?

答案是:一个栈结构。

  • CompletableFuture 内部有一个 stack 字段(通常是一个 Completion 对象),它代表了这个阶段的所有依赖(Dependencies),即所有等待当前阶段完成才能执行的后续阶段。
  • Completion 本身是一个链式栈节点,它有 next 指针指向下一个 Completion

Completion 是一个抽象类,它有多个重要的实现类:

  • UniCompletion: 处理一个依赖源的后续操作。例如:thenApply, thenAccept, thenRun
  • BiCompletion: 处理两个依赖源的后续操作。例如:thenCombine, thenAcceptBoth
  • CoCompletion: 用于 allOf / anyOf 等组合操作。

当你调用诸如 thenApply 这样的方法时:

  1. 如果当前阶段已经完成,JVM 会立即创建一个新线程(通常是 ForkJoinPool 中的线程)来执行你传入的函数。
  2. 如果当前阶段尚未完成,则会创建一个 UniCompletion 对象(例如 UniApply),这个对象封装了:
    • 后续阶段的目标 CompletableFuture(用于存储结果)
    • 你传入的函数(Function, Consumer, Runnable
    • 执行后续操作需要的线程池(Executor)
  3. 将这个 Completion 对象通过 CAS(Compare-And-Swap)操作 压入当前阶段的依赖栈(stack)中。这是一个无锁的并发操作,保证了线程安全。

3. 完成与触发

当一个 CompletableFuture(阶段A)通过 complete(value)completeExceptionally(exception) 方法被标记为完成时:

  1. 设置结果和状态:首先 CAS 修改状态为 COMPLETING,然后设置 result,最后修改状态为 NORMALEXCEPTIONAL
  2. 触发后续操作:然后,它开始遍历并弹出其依赖栈(stack)上的所有 Completion 对象。
  3. 执行后续操作:对于每一个弹出的 Completion 对象,调用它的 tryFire 方法。该方法会:
    • 检查依赖是否满足(例如,BiCompletion 需要检查两个源是否都完成)。
    • 如果满足,则使用指定的线程池(或默认线程池)来执行你传入的函数。
    • 将函数的执行结果(或异常)通过 complete 方法设置给后续阶段(B)的 CompletableFuture
  4. 链式传播:当阶段 B 被完成时,它会重复步骤 2 和 3,触发它自己的依赖栈上的所有后续操作。如此一环扣一环,就形成了链式反应,实现了整个异步任务的流水线。

让我们通过一个简单的链式调用来看看整个流程是如何运作的。

1
2
3
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenAccept(System.out::println);
  1. 初始阶段 (CF1):
    • supplyAsync(() -> “Hello”) 立即提交任务到默认的 ForkJoinPool,并返回一个未完成的 CompletableFuture(我们称之为 CF1)。
    • 异步任务 () -> “Hello” 在线程池中执行。
  2. 添加第一个依赖 (CF2):
    • 调用 CF1.thenApply(s -> s + “ World”)
    • 由于 CF1 未完成,JVM 创建一个 UniApply 对象,封装了函数 s -> s + “ World” 和后续的新 CompletableFuture(CF2)。
    • 通过 CAS 将这个 UniApply 压入 CF1 的依赖栈。
  3. 添加第二个依赖 (CF3):
    • 调用 CF2.thenAccept(System.out::println)
    • 由于 CF2 未完成(因为它在等 CF1),JVM 创建一个 UniAccept 对象,封装了函数 System.out::println 和后续的新 CompletableFuture(CF3)。
    • 通过 CAS 将这个 UniAccept 压入 CF2 的依赖栈。
  4. 初始任务完成:
    • 初始的 supply 任务执行完毕,得到结果 "Hello"
    • 它调用 CF1.complete(“Hello”)
    • CF1 设置自己的 result"Hello",状态变为 NORMAL
    • CF1 开始遍历它的依赖栈,发现了 UniApply
  5. 触发第一个后续操作:
    • CF1 调用 UniApply.tryFire(...)
    • tryFire 方法从 CF1 的 result 中取出 "Hello",作为参数执行函数 s -> s + “ World”,得到新结果 "Hello World"
    • 然后调用 CF2.complete(“Hello World”)
  6. 触发链式反应:
    • CF2 被完成了!它设置自己的 result"Hello World"
    • CF2 开始遍历自己的依赖栈,发现了 UniAccept
    • CF2 调用 UniAccept.tryFire(...)
    • tryFire 方法从 CF2 的 result 中取出 "Hello World",执行函数 System.out::println,打印出结果。
    • 然后调用 CF3.complete(null)(因为 thenAccept 没有返回值)。
  7. 最终状态:
    • 此时,CF1、CF2、CF3 全部完成。整个异步流水线执行结束。

如果整个链式调用过程中某一步发生了异常,异常会被捕获并包装成一个 CompletionException,然后通过 completeExceptionally 传递给后续阶段。后续阶段如果也有异常处理函数(如 exceptionally),则会触发它,否则异常会一直被传播,直到被获取(如调用 join()/get() 时)。

5. 本地消息表 + 定时任务

  1. 少货单核实完成需要通知电商售后以及生成后续单据。
  2. 使用本地消息表,将 消息表与 少货单放在同一个库下面,核实完成时写入少货单的消息数据。
  3. 采用 ElasticJob 定时任务轮询消息表内容,调用电商售后的 http 接口实现通知。
  4. 失败自动更新消息表的重试次数。
  5. 重试次数达到最大重试还未完成,企业微信预警人工介入。

消息表内容

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE local_message (
id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
business_key VARCHAR(64) NOT NULL COMMENT '业务唯一标识',
status TINYINT NOT NULL DEFAULT 0 COMMENT '状态:0-待发送,1-已发送,2-发送失败,3-已确认',
retry_count INT NOT NULL DEFAULT 0 COMMENT '重试次数',
max_retry_count INT NOT NULL DEFAULT 5 COMMENT '最大重试次数',
last_response_body TEXT COMMENT '最后响应内容',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX idx_status_next_retry_time (status, next_retry_time),
INDEX idx_business_key (business_key)
) COMMENT '本地消息表';

6. ElasticJob 定时任务

ElasticJob 是一个分布式定时任务调度框架,其核心设计目标在于解决传统单点定时任务(如 Spring Scheduler, Quartz 单机版)的可靠性大数据量处理问题。它通过分片的概念,将任务分布式地执行,并提供了弹性扩容、故障转移、任务治理等能力。

核心流程总结

启动注册 -> 选举Leader -> Leader计算分片 -> 所有实例获取分片 -> 定时触发执行 -> (节点宕机 -> 监听发现 -> 重新分片 -> 继续执行)

优点

  1. 分布式与弹性扩展: 轻松应对大数据量任务,水平扩展能力强。
  2. 高可用: 自动的故障转移机制,无单点风险。
  3. 任务治理: 提供运维控制台,可以查看状态、修改配置、触发执行等。
  4. 避免重复执行: 通过分片机制,从设计上就保证了同一分片在同一时刻只会被一个实例执行。

缺点/注意点

  1. 系统复杂性: 引入了 Zookeeper 作为外部依赖,部署和运维复杂度增加。
  2. 网络开销: 实例与 ZK 之间需要频繁的心跳和通信,网络波动可能影响稳定性。
  3. 并非绝对实时: 分片重分配、故障转移都有短暂延迟(取决于 ZK 会话超时时间),对于秒级精度的任务需要考量。

流程 1:作业启动与注册

  1. 应用启动: 当你的 Spring Boot 或 Java 应用启动时,内置的 ElasticJob 客户端(ElasticJobLite)会自动初始化。
  2. 连接注册中心: 客户端根据配置的 Zookeeper 地址,建立连接。
  3. 实例注册: 客户端在 ZK 上创建临时节点(Ephemeral Node)来注册自己。
    • 路径示例:/${namespace}/${job_name}/instances/192.168.1.100@-@12345 (IP@-@进程ID)
    • 关键点:使用临时节点非常重要。如果服务器宕机或与 ZK 的连接断开,这个节点会自动消失,注册中心能立即感知到该实例下线。

流程 2:任务调度与分片(Leader Election & Sharding)

  1. 选举主节点: 当一个作业的多个实例都启动后,它们会竞争一个“Leader”角色。在 ZK 上通过创建临时节点 /${namespace}/${job_name}/leader/election/instance 来实现,成功创建者即为 Leader(主节点)。
  2. 分片计算(由 Leader 执行)
    • 非 Leader 节点: 休眠等待,不参与分片计算。
    • Leader 节点: 从 ZK 获取所有在线的作业实例列表。然后,根据配置的分片策略(如平均分配),计算出每个实例应该执行哪些分片项。
    • 示例: 假设有 3 台服务器(A, B, C),总分片数为 10。平均分配策略计算出的结果可能是:A -> [0, 1, 2], B -> [3, 4, 5, 6], C -> [7, 8, 9]。
  3. 分片结果写入: Leader 将计算好的分片分配结果(instance -> sharding_items)写入到 ZK 的特定节点。
    • 路径示例:/${namespace}/${job_name}/sharding/0/instance 节点的数据是 192.168.1.100@-@12345,表示分片 0 分配给了这个实例。

流程 3:任务执行

  1. 监听分片分配: 每个作业实例都监听着 ZK 上自己所需的分片分配路径。一旦 Leader 写入了新的分片结果,所有实例都会收到通知。
  2. 获取自身分片: 每个实例从 ZK 读取数据,找到分配给自己的分片项列表。
  3. 创建定时任务: ElasticJob 底层使用 Quartz 或内置线程池来管理定时任务。每个实例会为分配给自己每一个分片项创建一个独立的定时任务触发器。
  4. 触发执行: 当 Cron 表达式设定的时间到达时,Quartz 触发器或线程池会触发 ElasticJob 接口的 execute 方法。
  5. 执行业务逻辑: 在你的 execute 方法实现中,你可以通过 ShardingContext 参数获取到当前执行所对应的分片项(shardingItem)和总分片数(shardingTotalCount)。你的业务代码可以根据这些信息处理对应的数据。

流程 4:失效转移(Failover) - 核心高可用机制

这是 ElasticJob 实现高可用的关键流程。

  1. 节点宕机: 假设服务器 B 突然宕机。
  2. ZK 感知: 由于 B 在 ZK 上注册的是临时节点,节点会自动被 ZK 删除。
  3. 触发监听: 所有其他存活的作业实例(A 和 C)都监听着实例列表的变化,它们立刻感知到 B 下线了。
  4. 查找待转移分片: 系统会去 ZK 的 failover 目录下查找,看是否有分片项标记为需要故障转移。同时,也会检查 B 之前负责的分片(例如分片 3,4,5,6)现在是否没有实例在执行。
  5. 重新选举与分片流程 2 被再次触发。存活的实例重新选举 Leader(例如 A 成为新 Leader),新的 Leader 重新计算分片分配(现在只有 A 和 C 在线),并将 B 的分片项重新分配给 A 和 C。
  6. 继续执行: A 和 C 拿到新的分片分配后,开始执行新的分片任务,其中包括了原来 B 负责的部分。整个过程由框架自动完成,业务无感知,保证了任务的连续性。

7. 线程池

导出 - 使用 CountDownLatch 设置为总页数,主线程阻塞,直到所有数据已写入excel 线程池的核心线程数控制可拉取的最大数据量,避免拉取过多数据造成堆内存溢出。

要理解线程池,必须彻底理解其构造函数的 7 个核心参数:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

这 7 个参数共同决定了线程池的行为策略。

  1. corePoolSize (核心线程数)
    • 线程池中长期维持的线程数量,即使这些线程处于空闲状态,也不会被销毁(除非设置了 allowCoreThreadTimeOuttrue)。
  2. maximumPoolSize (最大线程数)
    • 线程池能容纳的最大线程数。当工作队列满了之后,后续的任务会尝试创建新的线程来执行,直到线程总数达到这个值。
  3. keepAliveTime + unit (线程空闲时间)
    • 作用于非核心线程(如果 allowCoreThreadTimeOuttrue,也作用于核心线程)。当一个非核心线程空闲时间超过 keepAliveTime,它就会被回收销毁。
  4. workQueue (工作队列)
    • 用于保存等待执行的任务的阻塞队列。它是一个 BlockingQueue<Runnable> 的实现。
    • 常见类型
      • ArrayBlockingQueue:有界队列,需要指定大小。
      • LinkedBlockingQueue:无界队列(理论上有界,Integer.MAX_VALUE),会导致 maximumPoolSize 失效。
      • SynchronousQueue:不存储元素的队列,每个插入操作必须等待另一个线程的移除操作。maximumPoolSize 通常会设置很大(如 Integer.MAX_VALUE)。
      • PriorityBlockingQueue:带优先级的无界队列。
  5. threadFactory (线程工厂)
    • 用于创建新线程的工厂。可以用于给线程设置更有意义的名字、优先级、是否为守护线程等,便于排查问题。
  6. handler (拒绝策略)
    • 当线程池已经关闭,或者线程池和队列都已饱和(达到最大线程数且队列已满)时,对新提交的任务采取的处理策略。
    • 内置策略
      • AbortPolicy(默认):直接抛出 RejectedExecutionException 异常。
      • CallerRunsPolicy:用调用者所在的线程来执行任务。
      • DiscardPolicy:直接丢弃任务,不做任何处理。
      • DiscardOldestPolicy:丢弃队列中最旧的任务(下一个即将被执行的任务),然后尝试重新提交当前任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
flowchart TD
A[提交任务] --> B{线程数 < corePoolSize?}
B -- 是 --> C[创建新的核心线程并执行任务]
B -- 否 --> D{任务队列未满?}
D -- 是 --> E[将任务加入工作队列]

D -- 否 --> F{线程数 < maximumPoolSize?}
F -- 是 --> G[创建新的非核心线程并执行任务]
F -- 否 --> H[触发拒绝策略]

E --> I{线程池是否已停止?}
I -- 是 --> J[从队列移除任务并触发拒绝策略]
I -- 否 --> K[等待线程从队列获取并执行]
  1. 提交任务:调用 execute(Runnable command) 方法。
  2. 创建核心线程
    • 如果当前运行的线程数 小于 corePoolSize,即使有空闲线程,线程池也会立即创建一个新的核心线程来执行这个新提交的任务。这一步是优先创建线程,而不是先入队
  3. 入队等待
    • 如果当前运行的线程数 大于或等于 corePoolSize,线程池不会立即创建新线程,而是尝试将任务放入工作队列 (workQueue.offer(command))。
    • 如果入队成功,线程池会进行二次检查(double-check):再次检查线程池状态。如果此时线程池已关闭(SHUTDOWN/STOP),则需要将刚入队的任务回滚(remove)并执行拒绝策略。如果状态正常且池中已无线程(比如刚创建的线程死了),则会确保至少有一个线程来处理队列。
  4. 创建非核心线程
    • 如果入队失败(通常是因为队列已满),线程池会尝试创建新的非核心线程来立即执行这个无法入队的任务。
    • 只有当当前线程数 小于 maximumPoolSize 时,新线程才会被创建并成功执行任务。
  5. 执行拒绝策略
    • 如果第 4 步也失败了(即当前线程数已经达到 maximumPoolSize,无法创建新线程),说明线程池已经完全饱和,无法再处理这个新任务。此时将触发构造时传入的 RejectedExecutionHandler,执行拒绝策略。

8. 限流算法

1. 固定窗口算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.atomic.AtomicInteger;

public class FixedWindowRateLimiter {
private final int limit; // 时间窗口内最大请求数
private final long windowSizeInMillis; // 时间窗口大小(毫秒)
private long currentWindowStart; // 当前窗口开始时间
private AtomicInteger counter; // 当前窗口请求计数

public FixedWindowRateLimiter(int limit, long windowSizeInMillis) {
this.limit = limit;
this.windowSizeInMillis = windowSizeInMillis;
this.currentWindowStart = System.currentTimeMillis();
this.counter = new AtomicInteger(0);
}

public synchronized boolean allowRequest() {
long currentTime = System.currentTimeMillis();

// 如果当前时间已超过当前窗口,重置窗口和计数器
if (currentTime - currentWindowStart >= windowSizeInMillis) {
currentWindowStart = currentTime;
counter.set(0);
}

// 检查是否超过限制
if (counter.get() < limit) {
counter.incrementAndGet();
return true;
}

return false;
}
}

2. 滑动窗口算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class SlidingWindowRateLimiter {
private final int limit; // 时间窗口内最大请求数
private final long windowSizeInMillis; // 时间窗口大小(毫秒)
private ConcurrentLinkedQueue<Long> requestTimestamps; // 存储请求时间戳
private AtomicInteger counter; // 当前窗口请求计数

public SlidingWindowRateLimiter(int limit, long windowSizeInMillis) {
this.limit = limit;
this.windowSizeInMillis = windowSizeInMillis;
this.requestTimestamps = new ConcurrentLinkedQueue<>();
this.counter = new AtomicInteger(0);
}

public synchronized boolean allowRequest() {
long currentTime = System.currentTimeMillis();
long windowStart = currentTime - windowSizeInMillis;

// 移除过期的时间戳
while (!requestTimestamps.isEmpty() && requestTimestamps.peek() <= windowStart) {
requestTimestamps.poll();
counter.decrementAndGet();
}

// 检查是否超过限制
if (counter.get() < limit) {
requestTimestamps.offer(currentTime);
counter.incrementAndGet();
return true;
}

return false;
}
}

3. 漏桶算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.atomic.AtomicLong;

public class LeakyBucketRateLimiter {
private final long capacity; // 桶的容量
private final long leakRate; // 漏水速率(毫秒/请求)
private AtomicLong waterLevel; // 当前水位
private long lastLeakTime; // 上次漏水时间

public LeakyBucketRateLimiter(long capacity, long leakRate) {
this.capacity = capacity;
this.leakRate = leakRate;
this.waterLevel = new AtomicLong(0);
this.lastLeakTime = System.currentTimeMillis();
}

public synchronized boolean allowRequest() {
leakWater();

if (waterLevel.get() < capacity) {
waterLevel.incrementAndGet();
return true;
}

return false;
}

private void leakWater() {
long currentTime = System.currentTimeMillis();
long elapsedTime = currentTime - lastLeakTime;

// 计算应该漏掉的水量
long leaks = elapsedTime / leakRate;
if (leaks > 0) {
long newWaterLevel = Math.max(0, waterLevel.get() - leaks);
waterLevel.set(newWaterLevel);
lastLeakTime = currentTime;
}
}
}

4. 令牌桶算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.concurrent.atomic.AtomicLong;

public class TokenBucketRateLimiter {
private final long capacity; // 桶的容量
private final long refillRate; // 令牌添加速率(毫秒/令牌)
private AtomicLong tokens; // 当前令牌数量
private long lastRefillTime; // 上次添加令牌时间

public TokenBucketRateLimiter(long capacity, long refillRate) {
this.capacity = capacity;
this.refillRate = refillRate;
this.tokens = new AtomicLong(capacity);
this.lastRefillTime = System.currentTimeMillis();
}

public synchronized boolean allowRequest() {
refillTokens();

if (tokens.get() > 0) {
tokens.decrementAndGet();
return true;
}

return false;
}

private void refillTokens() {
long currentTime = System.currentTimeMillis();
long elapsedTime = currentTime - lastRefillTime;

// 计算应该添加的令牌数量
long newTokens = elapsedTime / refillRate;
if (newTokens > 0) {
long currentTokens = tokens.get();
long updatedTokens = Math.min(capacity, currentTokens + newTokens);
tokens.set(updatedTokens);
lastRefillTime = currentTime;
}
}
}

9. ShardingSphere

当一个应用程序执行一条 SQL 时,ShardingSphere 内部的处理流程是其原理的核心体现。这个过程主要分为 SQL 解析 -> SQL 路由 -> SQL 改写 -> SQL 执行 -> 结果归并 五个步骤.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
flowchart TD
A[应用程序执行 SQL] --> B[ShardingSphere-JDBC<br>拦截SQL]
B --> C["SQL 解析 (Parsing)"]
C --> C1["解析成AST抽象语法树"]
C1 --> C2["提取分片键值<br>与分片条件"]

C2 --> D["SQL 路由 (Routing)"]
D --> D1["根据分片策略<br>(如: t_order_id % 2)"]
D1 --> D2["确定目标数据源<br>与表(如: ds1.t_order_1)"]

D2 --> E["SQL 改写 (Rewriting)"]
E --> E1["逻辑SQL → 真实SQL<br>(如: t_order → t_order_1)"]
E1 --> E2["补充改写(如: 自增主键)"]

E2 --> F["SQL 执行 (Executing)"]
F --> F1["通过多线程/连接<br>并发执行到多个真实DB"]

F1 --> G["结果归并 (Merging)"]
G --> G1["遍历归并<br>排序归并<br>分组归并等"]
G1 --> H[将最终结果返回给应用]

1. SQL 解析 (Parsing)

  • 目的:理解 SQL 的含义,提取关键信息以供后续路由。
  • 过程
    • 词法解析:将 SQL 字符串拆分成一个个的“单词”(token),例如 SELECT, id, FROM, t_order, WHERE, user_id=100
    • 语法解析:根据 SQL 的语法规则,将这些 token 转换成一颗抽象语法树 (AST)。这棵树清晰地表示了 SQL 的结构:哪部分是查询字段,哪部分是表名,哪部分是条件(WHERE)。
  • 关键提取:从 AST 中提取出分片字段(如 user_id 和它的值(如 100,这是决定数据该去哪里的关键。

2. SQL 路由 (Routing)

  • 目的:根据解析上下文和用户配置的分片规则,决定这条 SQL 应该去哪个数据库、哪个表执行。
  • 过程
    • 根据配置的分片策略(例如:user_id % 4)和从 SQL 中提取出的分片键值(100),计算出目标数据源和表的真实名称。
    • 例如:100 % 4 = 0,那么它可能被路由到 ds_0.t_order_0 这个物理表。
  • 路由类型
    • 单片路由:SQL 只落在一个物理表上,例如根据分片键的精准查询 WHERE user_id = 100
    • 多片路由:SQL 可能落在多个物理表上,例如范围查询 WHERE user_id BETWEEN 10 AND 30 或没有带分片键的查询 SELECT * FROM t_order(全库表路由)。
    • 广播路由:SQL 需要在所有分片上都执行一次,例如 UPDATE t_config SET value = ...(更新一条全局配置)。

3. SQL 改写 (Rewriting)

  • 目的:将逻辑 SQL(应用程序写的,面向逻辑表的 SQL)改写为可以在真实数据库上正确执行的物理 SQL。
  • 常见改写
    • 表名改写:将逻辑表名 t_order 替换为真实的物理表名 t_order_0
    • 补充列:如果 SQL 是 INSERT INTO t_order (content) VALUES ('test') 但没有包含分片键 user_id,而分片规则又需要它,ShardingSphere 可能会根据配置的分布式主键生成器(如雪花算法)生成一个 user_id 并补充到 SQL 中。
    • 分页改写LIMIT 100, 10 在分片环境下需要改写。因为每个分片可能都返回10条数据,但归并后需要的是全局的第100-110条。ShardingSphere 会将其改写为 LIMIT 0, 110,先获取足够的数据,再在内存中进行归并排序和分页。
    • 批量改写INSERT INTO t_order (...) VALUES (1), (2), (3) 可能会根据 user_id 的值被拆分成多条 INSERT 语句,分别插入到不同的物理表中。

4. SQL 执行 (Executing)

  • 目的:执行改写后的真实 SQL。
  • 过程
    • ShardingSphere 会创建一个执行引擎,它通过多线程异步的方式,将路由后的多条 SQL 分发到不同的数据源上执行。
    • 这优化了 IO 操作,避免了串行执行带来的网络延迟累加,大大提升了性能。

5. 结果归并 (Merging)

  • 目的:将从各个数据节点获取到的多个结果集,合并成一个统一的结果集返回给应用程序。
  • 归并类型
    • 遍历归并:最简单的多结果集归并,只需按顺序依次取出每个结果集的数据即可。
    • 排序归并:如果 SQL 中有 ORDER BY,则需要将所有分片返回的结果进行全局排序后再返回。
    • 分组归并:如果 SQL 中有 GROUP BY,则需要将所有分片返回的结果进行全局分组和聚合(如 SUM, COUNT)。
    • 分页归并:结合排序归并,在内存中完成全局排序后,再截取指定区间的数据。

10. Spring

1. spring 流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
SpringApplication.run()
|
|-- 初始化 SpringApplication (推断类型,加载Initializers/Listeners)
|
|-- run()
|
|-- 准备环境 (加载配置) & 发布 ApplicationEnvironmentPreparedEvent
|
|-- 创建 ApplicationContext
|
|-- 准备上下文 (关联环境,应用Initializers)
|
|-- refreshContext() -> AbstractApplicationContext.refresh()
|
|-- prepareRefresh()
|
|-- obtainFreshBeanFactory()
|
|-- prepareBeanFactory()
|
|-- postProcessBeanFactory()
|
|-- 【核心1】invokeBeanFactoryPostProcessors(beanFactory)
| |-- 处理 @Configuration, @ComponentScan, @Import
| |-- Spring Boot自动配置类在此处被解析、条件判断、注册BeanDefinition
|
|-- 【核心2】registerBeanPostProcessors(beanFactory)
| |-- 注册各种后置处理器 (如 @Autowired, @PostConstruct, AOP 处理器)
|
|-- onRefresh() // Spring Boot 在此处创建内嵌Web服务器 (Tomcat/Jetty/Netty)
|
|-- registerListeners()
|
|-- 【核心3】finishBeanFactoryInitialization(beanFactory)
| |-- 实例化所有非懒加载单例Bean
| | |-- 创建实例
| | |-- 依赖注入 (@Autowired)
| | |-- Aware接口回调
| | |-- BeanPostProcessor前置处理
| | |-- 初始化 (@PostConstruct, InitializingBean)
| | |-- BeanPostProcessor后置处理 (AOP代理在此发生)
| | |-- 放入单例池
|
|-- finishRefresh() & 发布 ContextRefreshedEvent
|
|-- 执行 ApplicationRunner/CommandLineRunner
|
|-- 发布 ApplicationReadyEvent
|
|-- 启动完成

1. 循环依赖

A实例的初始化过程:

  1. 创建A实例,实例化的时候把A对象工厂放入三级缓存,表示A开始实例化了,虽然我这个对象还不完整,但是先曝光出来让大家知道。
  2. A注入属性时,发现依赖B,此时B还没有被创建出来,所以去实例化B。
  3. 同样,B注入属性时发现依赖A,它就会从缓存里找A对象。依次从一级到三级缓存查询A,从三级缓存通过对象工厂拿到A,发现A虽然不太完善,但是存在,把A放入二级缓存,同时删除三级缓存中的A,此时,B已经实例化并且初始化完成,把B放入一级缓存。
  4. 接着A继续属性赋值,顺利从一级缓存拿到实例化且初始化完成的B对象,A对象创建也完成,删除二级缓存中的A,同时把A放入一级缓存。

11. mysql

1. 索引创建的依据

  1. 频繁出现在查询条件的列:如果某个列经常用于 WHERE 子句中的查询条件、过滤条件,建立索引可以显著提升查询性能
  2. 频繁用于排序 (ORDER BY) 的列:如果某个列经常被用来进行排序操作,建立索引可以加快排序速度。
  3. 经常用于连接(JOIN)的列:如果两个表之间的某些列经常进行 JOIN 操作,应该为这些连接列建立索引,以加快连接速度。
  4. 唯一性要求的列:如果某个列的值必须唯一,可以通过 UNIQUE 索引来保证这一约束,同时提高查询速度。
  5. 高选择性列:高选择性列是指列中的不同值的数量较多。对于这样的列,建立索引可以有效提高查询效率,因为索引可以快速定位所需的记录。
  6. 组合查询条件:对于经常涉及多列查询条件的情况,可以建立复合索引(多列索引),这样可以在多列上提高查询性能。

2. 索引失效的场景

  1. 左模糊或左右模糊的时候
  2. 对索引列使用函数
  3. 对索引列进行表达式计算
  4. 对索引列隐式类型转换
  5. 联合索引要遵循最左匹配原则
  6. 在 where 子句中,如果在 or 前的条件列时索引列,而在 or 后的条件列不是索引列

3. Buffer Pool

用于存放磁盘数据缓存到内存的数据,buffer pool 中主要依据 页来进行缓存,磁盘和内存交换的最小单位是页,一个页的大小为 16kb,主要分为数据页,索引页、自适应哈希索引、锁信息,undo页;

注:redo log缓存独立为 Log Buffer,独立于 Buffer Pool;binlog 存放于 Binlog Cache,线程私有。

  1. free list:存放空闲页,数据从磁盘加载后,从 free list 中获取一个页存放加载的数据
  2. flush list:存放脏页,数据经过SQL语句更新之后,将对应的页放入该list,等待刷盘
  3. lru list:innodb优化了lru,避免缓存预读失败,lru 列表的前面为 热数据,后面为冷数据,从磁盘读取的数据都会优先放在冷数据区域,只有再次读取该页并且该页已滞留lru列表一段时间才会将其放入热数据区,及列表头部

4. explain

1
explain select select_options

EXPLAIN 语句输出的各个列的作用如下:

列名 描述
id 在一个大的查询语句中每个SELECT关键字都对应一个唯一的id
select_type SELECT关键字对应的那个查询的类型
table 表名
partitions 匹配的分区信息
type 数据扫描的类型,All-全表扫描、index-全索引扫描、range-索引范围扫描、ref-非唯一索引扫描、eq_ref-唯一索引扫描、const-结果只有一条的主键或唯一索引扫描
possible_keys 字段可能用到的索引
key 字段实际用到的索引
key_len 实际使用到的索引长度
ref 索引查询时引用的值来源,出现在关联查询,一般为 const或 列名
rows 预估的需要读取的记录条数
filtered 这个字段表示存储引擎返回的数据在server层过滤后,剩下多少满足查询的记录数量的比例,注意是百分比
Extra 一些额外的信息
  • type:数据扫描类型
    1. const:主键索引或唯一索引等值查询。
    2. eq_ref:存在与关联查询,关联字段为被驱动表的唯一键或主键时,驱动表存在多个匹配结果,否则都为 const。
    3. ref:非唯一所以等值查询
    4. range:索引范围查询
    5. index:索引全扫描
    6. All:全表扫描
  • Extra:一些额外的信息
    1. Using where:过滤条件除了索引列还包含了非索引列的过滤条件。
    2. Using index:所需数据只需在索引即可全部获得,不须要再到表中取数据,也就是使用了覆盖索引,避免了回表操作,效率不错。
    3. Using index condition:当使用索引条件下推优化时,如果存在某些被索引的列的判断条件时,MySQL服务器将这一部分判断条件传递给存储引擎,然后由存储引擎通过判断索引是否符合MySQL服务器传递的条件,只有当索引符合条件时才会将数据检索出来返回给MySQL服务器,in 操作也存在索引下推
    4. Using filesort:当查询语句中包含 order by 操作,而且无法利用索引完成排序操作的时候,这时不得不选择相应的排序算法进行,甚至可能会通过文件排序,效率是很低的,所以要避免这种问题的出现。
    5. Using temporary:可能会借助临时表来完成一些功能,比如 group by、distinct、排序之类的,如果不能有效利用索引来完成查询,MySQL 很有可能寻求通过建立内部的临时表来执行查询。

4. 死锁分析

出现原因:少货超时进行超时处理,需要更新父单据和对应的核实子单据信息,所以sql存在 or 操作;这时存在操作员进行少货单核实操作,由于加锁顺序不一致,出现死锁问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
update t_product_pick_difference
SET afterCheckResult = 'LACK_NOT_CPRF',
afterSalesStatus = 'CONFIRMED',
firstCheckTime = '2025-06-20 18:15:06',
actualAmount = 0.0,
storeRefundNo = 'SH250620136307027928895',
operatorId = 0,
operatorName = 'System'
where deliveryDate = '2025-06-20 00:00:00'
and stationId = 1002178
and (afterSalesLogisticsId = '250620023102311160113901244' or parentId = 1142806249415794688)
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 10260 page no 11293 n bits 480 index idx_parentId of table `xsyx_station_pick2`.`t_product_pick_difference` trx id 6568742189 lock_mode X locks rec but not gap waiting
Record lock, heap no 407 PHYSICAL RECORD: n_fields 2; compact format; info bits 0
0: len 8; hex 1142806249415794688; asc : ` ;;
1: len 8; hex 1142884530067533825; asc Wl ;;

*** (2) TRANSACTION:
TRANSACTION 6568742188, ACTIVE 0 sec starting index read
mysql tables in use 1, locked 1
5 lock struct(s), heap size 1136, 3 row lock(s), undo log entries 6
MySQL thread id 24971418, OS thread handle 139582239307520, query id 7925580039 10.70.3.23 root updating
update
t_product_pick_difference
set waitCount = waitCount - 1,
waitScatteredCount = waitScatteredCount - 1,
status = 'VERIFIED'
where stationId = 1002178
and id = 1142806249415794688
and waitCount = 1
and waitScatteredCount = 1
*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 10260 page no 11293 n bits 480 index idx_parentId of table `xsyx_station_pick2`.`t_product_pick_difference` trx id 6568742188 lock_mode X locks rec but not gap
Record lock, heap no 407 PHYSICAL RECORD: n_fields 2; compact format; info bits 0
0: len 8; hex 1142806249415794688; asc : ` ;;
1: len 8; hex 1142884530067533825; asc Wl ;;

*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 10260 page no 15486 n bits 96 index PRIMARY of table `xsyx_station_pick2`.`t_product_pick_difference` trx id 6568742188 lock_mode X locks rec but not gap waiting
Record lock, heap no 19 PHYSICAL RECORD: n_fields 70; compact format; info bits 0
0: len 8; hex 1142806249415794688; asc : ` ;;
1: len 6; hex 0001875bcb23; asc [ #;;
2: len 7; hex e50000403d0110; asc @= ;;
3: len 8; hex 8000000000000000; asc ;;
4: len 8; hex 1142806249403211776; asc : ` ;;


*** WE ROLL BACK TRANSACTION (1)
------------
TRANSACTIONS
------------
Trx id counter 6570540943
Purge done for trx's n:o < 6570540801 undo n:o < 0 state: running but idle
  1. 获取死锁日志,afterSalesLogisticsId = ‘250620023102311160113901244’ 对应的主键为 1142806249415794688,parentId = 1142806249415794688 对应的主键为 1142884530067533825

    1
    SHOW ENGINE INNODB STATUS;
  2. 事务A执行下面语句,由于是or操作,首先获得了 afterSalesLogisticsId = ‘250620023102311160113901244’ 对应的 afterSalesLogisticsId 索引锁和其对应的主键锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    update t_product_pick_difference
    SET afterCheckResult = 'LACK_NOT_CPRF',
    afterSalesStatus = 'CONFIRMED',
    firstCheckTime = '2025-06-20 18:15:06',
    actualAmount = 0.0,
    storeRefundNo = 'SH250620136307027928895',
    operatorId = 0,
    operatorName = 'System'
    where deliveryDate = '2025-06-20 00:00:00'
    and stationId = 1002178
    and (afterSalesLogisticsId = '250620023102311160113901244' or parentId = 1142806249415794688)
  3. 事务B执行下面语句,插入了一条 parentId = 1142806249415794688 的数据,这时事务A的 or parentId = 1142806249415794688 还为锁定,由于数据插入成功

    1
    INSERT INTO `t_product_pick_difference` (`id`,`parentId`,`requestId`,`areaId`,`areaName`,`stationId`,`stationCode`,`storeCode`,`storeName`,`deliveryDate`,`warehouseId`,`warehouseName`,`skusn`,`productName`,`productPicture`,`productSpec`,`productCategory`,`differenceType`,`differenceCount`,`differenceScatteredCount`,`waitCount`,`waitScatteredCount`,`correctPickStoreSortNo`,`correctSkusn`,`correctPickCount`,`incorrectPickStoreSortNo`,`incorrectSkusn`,`incorrectPickCount`,`submitterId`,`submitterName`,`responsiblerId`,`responsiblerName`,`verifierId`,`verifierName`,`differenceTypeCode`,`afterCheckResult`,`afterSalesLogisticsId`,`firstCheckTime`,`afterSaleTime`,`afterAmount`,`actualAmount`,`lackProductReason`,`afterSalesTypeText`,`afterSalesType`,`afterSalesStatus`,`remark`,`status`,`tmCreate`,`tmSmp`,`differenceId`,`generateTemp`,`operatorId`,`operatorName`,`taskFailureReason`,`taskSuccessPictures`,`pickType`,`storeRefundNo`,`directorId`,`directorName`,`channelType`,`logisticsTag`,`reportStatus`,`errorLog`,`auditTime`,`applyRemark`,`voucherImages`,`needStationVerify`,`orderSource`) VALUES ('1142884530067533825','1142806249415794688','1750414504888',102,'湖北区域',1002178,'鄂K042','420922102645','兴盛美团优选东方书店','2025-06-20',10220,'武汉园区-集货仓','037695080','蓝月亮 深层洁净护理洗衣液 500g/包 薰衣草香','http://image.xsyxsc.com/vendor/20221115/3f208fe7-0e45-4707-8a97-7d8a99dd68351668501814753.png','500g/包','general','AFTER_SALE_LACK_PRODUCT',1.0,1,0.0,0,'龚鑫-218','',0,'','',0,0,'System',0,'',117773,'龚鑫','UPSTREAM_SHIP_DAMAGE','LACK_NOT_CPRF',null,'2025-06-20 18:15:06','2025-06-20 12:32:56',5.49,4.99,'差1代','物流少货','LOGISTICS_LACK_PRODUCT','FINISHED','','VERIFIED','2025-06-20 18:15:05','2025-06-20 18:15:05','L10021782025062000160',16,0,'System','','','generalCanBracket','SH250620136307027928895',117773,'龚鑫','USER_APPLY','[]','OK','DF021936004873703211010',null,null,null,'1','toC');
  4. 事务A 加完 or 操作前的锁,继续加 or 后的锁 parentId = 1142806249415794688,但是由于事务B进行了 parentId = 1142806249415794688 记录的插入操作,产生 ,所以事务A在进行加锁执行时出现锁等待,等待 parentId = 1142806249415794688 的 idx_parentId 锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    update t_product_pick_difference
    SET afterCheckResult = 'LACK_NOT_CPRF',
    afterSalesStatus = 'CONFIRMED',
    firstCheckTime = '2025-06-20 18:15:06',
    actualAmount = 0.0,
    storeRefundNo = 'SH250620136307027928895',
    operatorId = 0,
    operatorName = 'System'
    where deliveryDate = '2025-06-20 00:00:00'
    and stationId = 1002178
    and (afterSalesLogisticsId = '250620023102311160113901244' or parentId = 1142806249415794688)
  5. 事务B插入结束之后需要需要根据,需要更新主单数据为已核实,这时会对事务A中锁定的 afterSalesLogisticsId = ‘250620023102311160113901244’ 记录进行更新,出现获取主键锁 1142806249415794688 发生阻塞

    1
    2
    3
    4
    5
    6
    7
    8
    9
    update
    t_product_pick_difference
    set waitCount = waitCount - 1,
    waitScatteredCount = waitScatteredCount - 1,
    status = 'VERIFIED'
    where stationId = 1002178
    and id = 1142806249415794688
    and waitCount = 1
    and waitScatteredCount = 1

这时由于事务A和事务B都互相等待,出现死锁

解决:由于数据同步事务B被抛弃,会执行事务A,事务A执行结束之后 主从同步继续执行事务B,对系统影响较小,在数据迁移过程中仅出现一次,概率小;如果非要解决,可以将 or 操作分到两个事务进行,

12. RocketMQ

1. 是如何保证高可用

1. 高并发

优秀的存储架构设计:

  • 顺序写(Sequential Write): RocketMQ 的所有消息都持久化到磁盘的 CommitLog 文件中。无论消息属于哪个 Topic,都顺序追加写入到这个大文件中。磁盘的顺序写速度非常快(堪比内存随机写),这是高吞吐的基石。
  • 零拷贝(Zero Copy): 当消费者拉取消息时,Broker 需要将磁盘上的数据通过网络发送出去。传统的方式需要经过:磁盘 -> 内核缓冲区 -> 用户缓冲区 -> Socket 缓冲区 -> 网卡,共 4次拷贝3次上下文切换。RocketMQ 使用 mmap(内存映射)和 sendfile 等零拷贝技术,直接将磁盘文件数据映射到操作系统内核地址空间,然后直接从内核空间将数据通过网卡发送出去,减少了上下文切换和不必要的数据拷贝,极大提升了网络IO效率。
  • 异步刷盘(Async Flush): 消息写入 CommitLog 时,并非立即同步刷盘(Sync Flush),而是先写入到操作系统 Page Cache 中就返回成功,然后由后台线程异步地将多个消息一起刷盘。这种方式牺牲了极少量可靠性(机器宕机会丢失Page Cache中的数据),换来了巨大的性能提升。当然,RocketMQ 也提供了同步刷盘选项,以满足对可靠性要求极高的场景。
  • 消息过滤在服务端完成: 消费者可以提交过滤条件(如Tag),Broker 会帮消费者进行过滤,只返回符合条件的消息。这节省了不必要的网络传输开销。

高效的网络IO与线程模型:

  • Reactor 多线程模型: RocketMQ 使用了 Netty 作为网络通信框架,采用了经典的 Reactor 多线程模型。有专门的线程(Netty EventLoopGroup)处理网络连接、IO读写,另有独立的业务线程池处理编码/解码、心跳、消费请求等,职责分离,避免了IO操作阻塞业务处理。
  • 异步发送: 生产者提供异步发送API。发送消息时,消息会被放入一个缓冲队列后立即返回,由后台线程异步完成真正的发送,从而不会阻塞主业务线程。

生产端与消费端的负载均衡:

  • Topic 分区(Queue): 一个 Topic 可以被分为多个 MessageQueue。发送消息时,生产者会通过负载均衡策略(如轮询、哈希)将消息发送到不同的 Queue 中。这实现了发送端的负载均衡,将压力分散到多个 Queue。
  • 消费组(Consumer Group): 消费端同样通过负载均衡策略,将一个 Consumer Group 内的多个消费者实例分配到不同的 MessageQueue 上,实现消费端的负载均衡(一个 Queue 同一时间只被一个消费者消费)。通过增加 Queue 数量和消费者实例,可以线性地提升Topic的并发处理能力。

2. 高可用

高可用旨在保证系统在部分节点故障时,服务仍然可用,数据不会丢失。RocketMQ 主要通过 主从复制(Replication)故障自动转移(Failover) 来实现。

  1. Broker 集群模式

RocketMQ 提供了几种集群部署模式,其中高可用主要依靠多主多从模式。

  • 多 Master 多 Slave(异步复制/同步复制)
    • 角色: 每个 Broker 角色集群由多个 Master 节点和多个 Slave 节点组成。Master 负责处理读写请求,Slave 作为热备份,从 Master 同步数据。
    • 数据复制
      • 异步复制(Async Replication): Master 收到消息后,写入本地 CommitLog 后就返回成功,然后异步地将数据复制到 Slave。性能极高,但如果 Master 宕机且未将最新数据复制到 Slave,可能会造成少量消息丢失。
      • 同步复制(Sync Replication): Master 收到消息后,必须等待 Slave 成功写入数据后,才向生产者返回成功。保证了数据的强一致性,即使 Master 宕机,数据在 Slave 上也有完整备份,但性能会有下降(延迟增加)。
    • 故障转移
      • Slave 自动切换为 Master: 当某个 Master 节点宕机后,NameServer 会检测到其下线。如果配置了自动切换,RocketMQ 可以自动将其对应的 Slave 节点提升为新的 Master,继续提供服务,实现故障自动转移,整个过程对应用透明。
      • 消费端自动重试: 消费端从 NameServer 获取最新的路由信息,知道新的 Master 地址,会自动重连到新的 Master 继续消费。
  1. 元数据管理 - NameServer 集群
  • 去中心化与对等节点: NameServer 集群中的每个节点都是对等的,相互之间无数据同步。Broker 会向所有 NameServer 节点定时注册/发送心跳。
  • 最终一致性: 生产者/消费者客户端可以随机连接任意一个 NameServer 来拉取路由信息。即使某个 NameServer 宕机,只要还有一个 NameServer 存活,整个系统就能正常工作,因为所有 NameServer 最终都拥有几乎相同的路由信息。这使得 NameServer 集群本身非常简单且高可用。
  1. 消息重试与死信队列
  • 消费重试: 如果消费者消费某条消息失败,RocketMQ 会将该消息延迟一段时间后(1s, 5s, 10s, 30s…)重新投递,最多重试16次。
  • 死信队列(Dead-Letter Queue): 如果一条消息经过最大重试次数后仍然消费失败,RocketMQ 会将其投递到一个特殊的死信队列中。这保证了即使有异常消息,也不会阻塞正常消息的消费,系统仍然可用。后续可以由人工或特定程序来处理死信消息。

13. 问题排查/调优

参数 作用
-Xms 堆内存初始大小,默认为物理内存的 1/64
-Xmx 堆内存的最大大小,默认为物理内存的 1/4
-Xmn 堆内新生代的大小,老年代的大小 = [-Xmx] - [-Xmn]
-Xss 每个线程可使用的内存大小,及栈大小,相同物理内存下,线程数反比于该值
-XX:NewRatio 设置新时代和老年代的比值
-XX:SurvivorRatio 新时代中 Eden 和两个 s区的比值,
-XX:MaxTenuringThreshold 设置转入老年代的存活次数
-XX:PermSize/-XX:MaxPermSize 设置永久代最大大小和最小大小
-XX:MetaspaceSize/-XX:MaxMetaspaceSize 设置元空间最大大小和最小大小
-XX:+UseSerialGC 设置串行收集器
-XX:+UseParallelGC 设置并行收集器
-XX:+UseParallelOldGC 设置并行老年代收集器
-XX:+UseConcMarkSweepGC 设置老年代并发收集器 - CMS

1. 问题排查-CPU

主要原因:业务逻辑问题(死循环)、频繁GC、上下文切换过多

1. 业务逻辑问题

  1. jstack - 分析CPU问题:使用 jps 找到对应的 java进程编号

    1
    jps
  2. 使用 top 命令查找 CPU 使用率比较高的一些线程

    1
    top -H -p pid
  3. 将 top 得到的 pid 转为 16进制得到 nid

    1
    printf '%x\n' pid
  4. 接着使用 jstack 找到对应的堆栈信息

    1
    jstack pid | grep 'nid' -C5 --color

2. 频繁GC

使用 jstat 来对 GC 分代变化进行观察,如果看到 GC 比较频繁,再针对 GC 方面做进一步分析

1
jstat -gc pid 1000
具体列名 具体描述
S0C 年轻代中 s0 的容量(字节)
S1C 年轻代中 s1 的容量(字节)
S0U 年轻代中 s0 目前已使用的空间(字节)
S1U 年轻代中 s1 目前已使用的空间(字节)
EU 年轻代中 Eden 的容量(字节)
EU 年轻代中 Eden 目前已使用的空间(字节)
OC old代 的容量(字节)
OU old代 目前已使用的空间(字节)
PC/MC 方法区[永久代/元空间]的容量(字节)
PU/MU 方法区[永久代/元空间]目前已使用的空间(字节)
YGC 从应用程序启动到采样时年轻代中GC次数
YGCT 从应用程序启动到采样时年轻代中GC所用时间(s)
FGC 从应用程序启动到采样时全GC的GC次数
FGCT 从应用程序启动到采样时全GC的GC所用时间(s)
GCT 从应用程序启动到采样时gc用的总时间(s)

3. 上下文切换过多

  1. vmstat命令

    1
    2
    3
    4
    5
    6
    vmstat 1

    # 输出
    procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
    r b swpd free buff cache si so bi bo in cs us sy id wa st
    2 0 221660 186968 103624 555712 0 0 4 9 1 1 2 1 98 0 0

    procs 含义 :进程信息

    • r:等待运行的进程数,数值越大代表系统越繁忙
    • b:不可被唤醒的进程数,数值越大代表系统越繁忙

    memory 含义:内存信息

    • swpd:虚拟内存的使用情况,单位kb
    • free:空闲内存空间,单位kb
    • buff:缓冲的内存空间,单位kb
    • cache:缓存的内存空间,单位kb

    swap 含义:交换分区信息,数值越大,代表内存和磁盘之间的交换越频繁,系统的性能越差

    • si:从磁盘交换至内存的数据量,单位kb
    • so:从内存中交换到磁盘的数据量,单位kb

    io 含义:磁盘读/写信息,数值越大,代表系统的I/O越频繁

    • bi:从块设备中读取的数据的总量,单位块
    • bo:写到块设备数据的总量,单位块

    system 含义:系统信息字段,数值越大,代表系统和接口设备额通讯越频繁

    • in:每秒钟被终端的进程次数
    • cs:每秒钟进行的时间切换次数

    cpu 含义:cpu信息字段

    • us:非内核进程消耗cpu运算时间的百分比
    • sy:内核进程消耗cpu运算时间的百分比
    • id:空闲 cpu的百分比
    • wa:等待 IO 所消耗的 CPU 百分比
    • st:被虚拟机所盗用的百分比
  2. pidstat 进程上下文切换的次数

    1
    2
    3
    4
    5
    pidstat -w pid

    # 输出
    06:14:18 PM UID PID cswch/s nvcswch/s Command
    06:14:19 PM 1000 904704 0.99 0.00 nginx

    cswch:自愿切换次数

    nvcswch:非资源切换的次数

2. 问题排查 - 内存

  1. free 检查内存的各种情况

    1
    2
    3
    4
    5
    6
    free

    # 输出
    total used free shared buff/cache available
    Mem: 1915916 1047676 203400 1696 660400 868240
    Swap: 1049596 221660 827936
    数值 解释
    total 总计物理(swap)内存的大小
    used 已使用的物理(swap)内存的大小
    free 可用的物理(swap)内存的大小
    shared 多个进程共享的内存总额
    buff/cache 磁盘的缓存大小
    available 可以被新应用程序使用的内存大小
  2. OOM

    1. unable to create new native thread:没有足够的空间给线程分配java栈,基本线程池代码问题[shutdown]

      • JVM层面:Xss 减少单个 thread stack 的大小
      • 系统层面: 修改 /etc/security/limits.conf 中的 nofile 和 nproc 来增大 os 对线程的限制
    2. java heap space:堆内存已经达到了 -Xmx 设置的值

      使用 jstack 和 jmap 定位问题,一切正常才需要增加 Xmx 的值

    3. Meta space:元空间内存占用已达到 XX :MaxMetaspaceSize 设置的最大值,思路与上面一致

  3. stack overflow

    栈内存溢出:线程栈需要的内存大于 Xss 值,先排查再调整

  4. jmap 定位代码内存泄漏

    1. jmap 导出 dump 堆栈文件

      1
      jmap -dump:format=b,file=heap.hprof pid
    2. 通过 mat[https://eclipse.dev/mat/download/] 导入 heap.hprof 文件进行分析

      Leak Suspects:内存泄漏问题,mat 给出的内存泄漏建议。

      Top consumer:查看最大对象报告。

      thread overview:线程相关的问题。

      Histogram:类概览。

    3. 原因:

      • new 对象,大量对象创建
      • 文件流操作未正确关闭
      • ByteBuffer 缓存分配不合理
    4. JVM 启动参数,保证内存泄漏时触发生成 hprof 文件

      1
      -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=heap.hprof
  5. gc问题和线程

    • jstat 排查 gc 问题

    • pstree:查询进程的总体线程数量

      1
      pstree -p pid | wc -l
  6. 堆外内存[一般NIO]:Netty 报 OutOfDirectMemoryError 错误;直接使用 DirectByteBuffer 报 direct buffer memory

    1. 查询对应pid倒叙前 30 大的内存段,过段时间再跑一次,比较内存段增长的情况

      1
      pmap -x pid | sort -rn -k3 | head -30
    2. 通过 gdb 导出 dump 内存段,通过 heaxdump 进行查看

    3. 直接通过 jcmd 来查看内存段的增长

    4. 通过 jmap -histo:live pid 手动触发fullgc 来看看 对外内存有没有被回收

      • 如果被回收,说明对外内存分配的太小,通过 -XX:MaxDirectMamorySize 调整
      • 通过 jmap 分析那些不能被 gc 的对象,以及和 DirectByteBuffer 之间的引用关系

3. 问题排查 - 磁盘

  • df 查看文件系统状态

    1
    2
    3
    4
    5
    df -hl

    # 输出
    Filesystem Size Used Avail Use% Mounted on
    文件系统 容量 已用 可用 已用% 挂载点
  • iostat 磁盘问题还是性能问题排查,rrqm/s 、wrqm/s 分别表示读写速度,定位哪块磁盘出了问题

    1
    iostat -d -k -x
  • 定位进程磁盘读写

    1. iotop 命令定位文件读写来源,readlink 将 tid 转为 pid

      1
      2
      3
      iotop

      readlink -f /proc/*/task/tid/../..
    2. 根据 pid 查询进程具体的读写情况

      1
      cat /proc/pid/io
    3. 根据 lsof 确定具体的文件读写情况

      1
      lsof -p pid

4. 问题排查 - GC(G1)

1
jstat -gc pid 1000
  1. YoungGC 过频繁:短周期小对象多

    • Eden/新生代设置的太小,调整 -Xmn、-XX:SurvivorRatio 等参数设置来解决
    • 如果参数正常,需要 jmap 和 mat 对堆栈进行进一步排查
  2. YoungGC 耗时过长:明确耗时过长在哪一阶段

    • Ref Proc:需要注意引用相关的对象
    • Root Scanning:注意线程数、跨代引用
    • Object Copy:需要关注对象生存周期,与项目正常时间段进行比较
  3. 触发 FullGC:G1 更多的还是 mixedGC,触发 FullGC 一般都会有问题,G1 会退化使用 Serial 收集器来完成垃圾的清理工作,暂停时间达到秒级别。

    1. 并发阶段失败:在并发标记阶段,mixedGC之前老年代就被填满了,那么这时候 g1 就会放弃并发标记周期,着这种情况可能需要增加堆大小,或调整并发标记线程数 -XX:ConcGCThreads

    2. 晋升失败:在 GC的时候没有足够的内存供存活/晋升对象使用,所以触发 FullGC,这时候可以通过 -XX:G1ReservePercent 来增加预留内存百分比,减少 -XX:InitiatingHeapOccopancyPercent 来提前启动标记。

    3. 大对象分配失败:大对象找不到合适的 Region 空间进行分配,就会进行 FullGC

      • 可以增大内存或增大 -XX:G1HeapRegionSize

      • 不要程序 主动执行 System.gc()

      • 启动参数中配置 -XX:HeapDumpPath=/xxx/heap.hprof 来 dump fullGC相关文件,并通过 jinfo 来进行 GC 前后的dump,,这样可以得到两份文件进行对比

        1
        2
        3
        jinfo -flag +HeapDumpBeforeFullGC pid

        jinfo -flag +HeapDumpAfterFullGC pid

5. 排查案例

1. OutOfMemory

  1. 启动参数增加,OutOfMemory 自动生成堆栈快照

    1
    -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=heap.hprof
  2. 将 堆栈快照下载,导入 mat 分析,发现代码中 存在一个 LinkdedList 对象,占用了 2.2g 的内存空间

  3. 查看当前对象所属的线程堆栈,最后定位代码源,发现是 shardings-jdbc 中的一个 bug

    1
    select * from t_handover_item where wid = 10104 and handover_id in ()

    在解析该 SQL 时,由于传入的 handover_id 为 空集合,所以 shardings-jdbc 语法分析时进入了死循环

    parseInCondition

    1
    2
    3
    4
    List<SQLExpression> rights = new LinkdedList<>();
    do {
    rights.add(parseExpression(sqlStatement))
    } while(!equalAny(System.RIGHT_PAREN))
  4. bug 修复,业务层 如果 handover_id 列表为空,则不需要查询交接单明细

2. java 程序假死[无法提供服务]

  1. 首先使用获取有问题的 java 程序pid

    1
    jps -l
  2. 首先分析是否 CPU 或 内存打满

    1
    2
    3
    4
    top -H -p pid

    # 将输出的 pid 转为 nid
    printf '%x\n' pid
  3. 获取堆栈进行分析

    1
    jstack pid | grep 'nid' -C5 --color
  4. 如果还是分析不到原因,可能是死锁原因,分析线程持有锁信息,也可能等待数据库连接池资源

    1
    jstack -l pid
  5. 检查 gc 情况,是否gc耗时太久,或者 gc 频繁

    1
    jstat -gc pid 1000
  6. 最终可以可以导出堆栈文件进行静态分析,分析工具 mat

    1
    jmap -dump:format=b,file=heap.hprof pid

3. JVM 调优