Skip to the content.

RocketMQ

主从同步

最基础的主从模式,仅支持主从同步,并不支持主从切换。

HAService

DefaultHAService implements HAService

AcceptSocketService extends ServiceThread

用于Master监听Slave请求,单线程,基于NIO,为每个请求创建一个HAConnection。

DefaultAcceptSocketService extends AcceptSocketService

监听成功后创建DefaultHAConnection。

HAConnection

负责同步逻辑,接受到HAClient的连接请求后创建,包括一个单线程读服务和一个单线程写服务。

DefaultHAConnection implements HAConnection

ReadSocketService

处理客户端同步进度的请求,最多等待一秒执行一次,并包括心跳检测,超时未收到客户端请求会关闭Connection。

WriteSocketService

用于向客户端同步数据,使用MMAP传输CommitLog文件。

GroupTransferService extends ServiceThread

同步阻塞式的主从同步实现,单线程。

HAClient

Slave启动,单线程,用于同步数据,基于HAConnection与Master交互。

DefaultHAClient extends ServiceThread implements HAClient

使用状态机管理,READY状态尝试连接Master,Master侧AcceptSocketService会监听socket并为Slave创建HAConnection;TRANSFER主动从Master拉取同步数据;同步失败/长时间未收到同步数据会切换至READY,Master侧需要防止重复创建HAConnection

public void run() {
    log.info(this.getServiceName() + " service started");

    this.flowMonitor.start();

    while (!this.isStopped()) {
        try {
            switch (this.currentState) {
                case SHUTDOWN:
                    this.flowMonitor.shutdown(true);
                    return;
                case READY:
                    if (!this.connectMaster()) {
                        log.warn("HAClient connect to master {} failed", this.masterHaAddress.get());
                        this.waitForRunning(1000 * 5);
                    }
                    continue;
                case TRANSFER:
                    if (!transferFromMaster()) {
                        closeMasterAndWait();
                        continue;
                    }
                    break;
                default:
                    this.waitForRunning(1000 * 2);
                    continue;
            }
            long interval = this.defaultMessageStore.now() - this.lastReadTimestamp;
            if (interval > this.defaultMessageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {
                log.warn("AutoRecoverHAClient, housekeeping, found this connection[" + this.masterHaAddress
                    + "] expired, " + interval);
                this.closeMaster();
                log.warn("AutoRecoverHAClient, master not response some time, so close connection");
            }
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.closeMasterAndWait();
        }
    }

    this.flowMonitor.shutdown(true);
    log.info(this.getServiceName() + " service end");
}