打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
聊聊rocketmq的SERVICE_NOT_AVAILABLE

本文主要研究一下rocketmq的SERVICE_NOT_AVAILABLE

SERVICE_NOT_AVAILABLE

rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java

public class ResponseCode extends RemotingSysResponseCode {    public static final int FLUSH_DISK_TIMEOUT = 10;    public static final int SLAVE_NOT_AVAILABLE = 11;    public static final int FLUSH_SLAVE_TIMEOUT = 12;    public static final int MESSAGE_ILLEGAL = 13;    public static final int SERVICE_NOT_AVAILABLE = 14;    //......}
  • ResponseCode定义了SERVICE_NOT_AVAILABLE

PutMessageStatus

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java

public enum PutMessageStatus {    PUT_OK,    FLUSH_DISK_TIMEOUT,    FLUSH_SLAVE_TIMEOUT,    SLAVE_NOT_AVAILABLE,    SERVICE_NOT_AVAILABLE,    CREATE_MAPEDFILE_FAILED,    MESSAGE_ILLEGAL,    PROPERTIES_SIZE_EXCEEDED,    OS_PAGECACHE_BUSY,    UNKNOWN_ERROR,}
  • PutMessageStatus枚举中有一个是SERVICE_NOT_AVAILABLE

DefaultMessageStore

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

public class DefaultMessageStore implements MessageStore {    //......    public PutMessageResult putMessage(MessageExtBrokerInner msg) {        if (this.shutdown) {            log.warn("message store has shutdown, so putMessage is forbidden");            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);        }        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {            long value = this.printTimes.getAndIncrement();            if ((value % 50000) == 0) {                log.warn("message store is slave mode, so putMessage is forbidden ");            }            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);        }        if (!this.runningFlags.isWriteable()) {            long value = this.printTimes.getAndIncrement();            if ((value % 50000) == 0) {                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());            }            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);        } else {            this.printTimes.set(0);        }        if (msg.getTopic().length() > Byte.MAX_VALUE) {            log.warn("putMessage message topic length too long " + msg.getTopic().length());            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);        }        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());            return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);        }        if (this.isOSPageCacheBusy()) {            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);        }        long beginTime = this.getSystemClock().now();        PutMessageResult result = this.commitLog.putMessage(msg);        long elapsedTime = this.getSystemClock().now() - beginTime;        if (elapsedTime > 500) {            log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);        }        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);        if (null == result || !result.isOk()) {            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();        }        return result;    }    //......}
  • putMessage方法在shutdown为true、当前broker的role变为SLAVE、runningFlags.isWriteable()为false时都会返回PutMessageStatus.SERVICE_NOT_AVAILABLE的PutMessageResult

RunningFlags

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java

public class RunningFlags {    private static final int NOT_READABLE_BIT = 1;    private static final int NOT_WRITEABLE_BIT = 1 << 1;    private static final int WRITE_LOGICS_QUEUE_ERROR_BIT = 1 << 2;    private static final int WRITE_INDEX_FILE_ERROR_BIT = 1 << 3;    private static final int DISK_FULL_BIT = 1 << 4;    private volatile int flagBits = 0;    public RunningFlags() {    }    //......    public boolean isWriteable() {        if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | DISK_FULL_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) {            return true;        }        return false;    }    public boolean getAndMakeReadable() {        boolean result = this.isReadable();        if (!result) {            this.flagBits &= ~NOT_READABLE_BIT;        }        return result;    }    public boolean getAndMakeNotReadable() {        boolean result = this.isReadable();        if (result) {            this.flagBits |= NOT_READABLE_BIT;        }        return result;    }    public boolean getAndMakeWriteable() {        boolean result = this.isWriteable();        if (!result) {            this.flagBits &= ~NOT_WRITEABLE_BIT;        }        return result;    }    public boolean getAndMakeNotWriteable() {        boolean result = this.isWriteable();        if (result) {            this.flagBits |= NOT_WRITEABLE_BIT;        }        return result;    }    public void makeLogicsQueueError() {        this.flagBits |= WRITE_LOGICS_QUEUE_ERROR_BIT;    }    public void makeIndexFileError() {        this.flagBits |= WRITE_INDEX_FILE_ERROR_BIT;    }    public boolean getAndMakeDiskFull() {        boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);        this.flagBits |= DISK_FULL_BIT;        return result;    }    public boolean getAndMakeDiskOK() {        boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);        this.flagBits &= ~DISK_FULL_BIT;        return result;    }    //......}
  • RunningFlags提供了isWriteable方法,在NOT_WRITEABLE_BIT、WRITE_LOGICS_QUEUE_ERROR_BIT、DISK_FULL_BIT、WRITE_INDEX_FILE_ERROR_BIT的flag下都会返回false;而getAndMakeReadable、getAndMakeNotReadable、getAndMakeWriteable、getAndMakeNotWriteable、makeLogicsQueueError、makeIndexFileError、getAndMakeDiskFull、getAndMakeDiskOK均可能改变flagBits的值

SendMessageProcessor

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {    //......    private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,                                                   RemotingCommand request, MessageExt msg,                                                   SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,                                                   int queueIdInt) {        if (putMessageResult == null) {            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark("store putMessage return null");            return response;        }        boolean sendOK = false;        switch (putMessageResult.getPutMessageStatus()) {            // Success            case PUT_OK:                sendOK = true;                response.setCode(ResponseCode.SUCCESS);                break;            case FLUSH_DISK_TIMEOUT:                response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);                sendOK = true;                break;            case FLUSH_SLAVE_TIMEOUT:                response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);                sendOK = true;                break;            case SLAVE_NOT_AVAILABLE:                response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);                sendOK = true;                break;            // Failed            case CREATE_MAPEDFILE_FAILED:                response.setCode(ResponseCode.SYSTEM_ERROR);                response.setRemark("create mapped file failed, server is busy or broken.");                break;            case MESSAGE_ILLEGAL:            case PROPERTIES_SIZE_EXCEEDED:                response.setCode(ResponseCode.MESSAGE_ILLEGAL);                response.setRemark(                    "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");                break;            case SERVICE_NOT_AVAILABLE:                response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);                response.setRemark(                    "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");                break;            case OS_PAGECACHE_BUSY:                response.setCode(ResponseCode.SYSTEM_ERROR);                response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");                break;            case UNKNOWN_ERROR:                response.setCode(ResponseCode.SYSTEM_ERROR);                response.setRemark("UNKNOWN_ERROR");                break;            default:                response.setCode(ResponseCode.SYSTEM_ERROR);                response.setRemark("UNKNOWN_ERROR DEFAULT");                break;        }        String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);        if (sendOK) {            this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);            this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),                putMessageResult.getAppendMessageResult().getWroteBytes());            this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());            response.setRemark(null);            responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());            responseHeader.setQueueId(queueIdInt);            responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());            doResponse(ctx, request, response);            if (hasSendMessageHook()) {                sendMessageContext.setMsgId(responseHeader.getMsgId());                sendMessageContext.setQueueId(responseHeader.getQueueId());                sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());                int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();                int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();                int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);                sendMessageContext.setCommercialSendTimes(incValue);                sendMessageContext.setCommercialSendSize(wroteSize);                sendMessageContext.setCommercialOwner(owner);            }            return null;        } else {            if (hasSendMessageHook()) {                int wroteSize = request.getBody().length;                int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);                sendMessageContext.setCommercialSendTimes(incValue);                sendMessageContext.setCommercialSendSize(wroteSize);                sendMessageContext.setCommercialOwner(owner);            }        }        return response;    }    //......}
  • handlePutMessageResult方法会将SERVICE_NOT_AVAILABLE转化为ResponseCode.SERVICE_NOT_AVAILABLE

小结

  • ResponseCode定义了SERVICE_NOT_AVAILABLE;PutMessageStatus枚举中有一个是SERVICE_NOT_AVAILABLE;handlePutMessageResult方法会将SERVICE_NOT_AVAILABLE转化为ResponseCode.SERVICE_NOT_AVAILABLE
  • DefaultMessageStore的putMessage方法在shutdown为true、当前broker的role变为SLAVE、runningFlags.isWriteable()为false时都会返回PutMessageStatus.SERVICE_NOT_AVAILABLE的PutMessageResult
  • RunningFlags提供了isWriteable方法,在NOT_WRITEABLE_BIT、WRITE_LOGICS_QUEUE_ERROR_BIT、DISK_FULL_BIT、WRITE_INDEX_FILE_ERROR_BIT的flag下都会返回false;而getAndMakeReadable、getAndMakeNotReadable、getAndMakeWriteable、getAndMakeNotWriteable、makeLogicsQueueError、makeIndexFileError、getAndMakeDiskFull、getAndMakeDiskOK均可能改变flagBits的值

doc

  • ResponseCode
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
RocketMQ Broker消息处理流程剩余源码解析
1. Context
从 Android 静音看正确的查找 bug 的姿势
API Monitor: Spy on API Calls and COM Interfaces (Freeware 32
快速掌握模板方法模式
Azure上的Java:云原生身份验证
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服