網(wǎng)上有很多關(guān)于領(lǐng)取pos機(jī)源碼,producer 容錯(cuò)機(jī)制的知識(shí),也有很多人為大家解答關(guān)于領(lǐng)取pos機(jī)源碼的問題,今天pos機(jī)之家(www.tjfsxbj.com)為大家整理了關(guān)于這方面的知識(shí),讓我們一起來看下吧!
本文目錄一覽:
領(lǐng)取pos機(jī)源碼
1. 前言本文主要是介紹一下RocketMQ消息生產(chǎn)者在發(fā)送消息的時(shí)候發(fā)送失敗的問題處理?這里有兩個(gè)點(diǎn),一個(gè)是關(guān)于消息的處理,一個(gè)是關(guān)于broker的處理,比如說發(fā)送消息到broker-a的broker失敗了,我們可能下次就不想發(fā)送到這個(gè)broker-a,這就涉及到一個(gè)選擇broker的問題,也就是選擇MessageQueue的問題。
2. 失敗重試其實(shí)失敗重試我們?cè)诮榻BRocketMQ消息生產(chǎn)者發(fā)送消息的時(shí)候介紹過了,其實(shí)同步發(fā)送與異步發(fā)送都會(huì)失敗重試的,比如說我發(fā)送一個(gè)消息,然后超時(shí)了,這時(shí)候在MQProducer層就會(huì)進(jìn)行控制重試,默認(rèn)是重試2次的,加上你發(fā)送那次,一共是發(fā)送3次,如果重試完還是有問題的話,這個(gè)時(shí)候就會(huì)拋出異常了。
我們來看下這一塊的代碼實(shí)現(xiàn)( DefaultMQProducerImpl 類sendDefaultImpl方法):
這塊其實(shí)就是用for循環(huán)實(shí)現(xiàn)的,其實(shí)不光RocketMQ,分布式遠(yuǎn)程調(diào)用框架Dubbo的失敗重試也是用for循環(huán)實(shí)現(xiàn)的。
3. 延遲故障我們都知道,在RocketMQ中一個(gè)topic其實(shí)是有多個(gè)MessageQueue這么一個(gè)概念的,然后這些MessageQueue可能對(duì)應(yīng)著不同的broker name,比如說id是0和1的MessageQueue 對(duì)應(yīng)的broker name是 broker-a ,然后id是2和3的MessageQueue對(duì)應(yīng)的broker name 是broker-b
我們發(fā)送消息的時(shí)候,其實(shí)涉及到發(fā)送給哪個(gè)MessageQueue這么一個(gè)問題,當(dāng)然我們可以在發(fā)送消息的時(shí)候指定這個(gè)MessageQueue,如果你不指定的話,RocketMQ就會(huì)根據(jù)MQFaultStrategy 這么一個(gè)策略類給選擇出來一個(gè)MessageQueue。
我們先來看下是在哪里選擇的,其實(shí)就是在我們重試的循環(huán)中: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
...// 重試發(fā)送for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); // todo 選擇message queue MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); ...復(fù)制代碼
我們可以看到,它會(huì)把topicPublishInfo 與 lastBrokerName 作為參數(shù)傳進(jìn)去,topicPublishInfo 里面其實(shí)就是那一堆MessageQueue, 然后這個(gè)lastBrokerName 是上次我們選擇的那個(gè)broker name , 這個(gè)接著我們來看下這個(gè)selectOneMessageQueue實(shí)現(xiàn):
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // todo return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);}復(fù)制代碼
可以看到它調(diào)用了MQFaultStrategy 這個(gè)類的selectOneMessageQueue 方法,我們接著進(jìn)去:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 發(fā)送延遲故障啟用,默認(rèn)為false if (this.sendLatencyFaultEnable) { try { // 獲取一個(gè)index int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 選取的這個(gè)broker是可用的 直接返回 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } // 到這里 找了一圈 還是沒有找到可用的broker // todo 選擇 距離可用時(shí)間最近的 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } // todo return tpInfo.selectOneMessageQueue(lastBrokerName);}復(fù)制代碼
這種延遲故障策略其實(shí)是由sendLatencyFaultEnable來控制的,它默認(rèn)是關(guān)閉的。
3.1 最普通的選擇策略我們先來看下最普通的選擇策略,可以看到調(diào)用了TopicPublishInfo 的selectOneMessageQueue方法:
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { // 消息第一個(gè)發(fā)送的時(shí)候 還沒有重試 也沒有上一個(gè)brokerName if (lastBrokerName == null) { return selectOneMessageQueue(); } else { // 這個(gè) 出現(xiàn)在重試的時(shí)候 for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 避開 上次發(fā)送的brokerName if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } // todo 到最后 沒有避開 只能隨機(jī)選一個(gè) return selectOneMessageQueue(); }}復(fù)制代碼
它這里里面分成了2部分,一個(gè)是沒有 這個(gè)lastBroker的,也就是這個(gè)這個(gè)消息還沒有被重試過,這是第一次發(fā)送這個(gè)消息,這個(gè)時(shí)候它的lastBrokerName就是null,然后他就會(huì)直接走selectOneMessageQueue 這個(gè)無參方法。
public MessageQueue selectOneMessageQueue() { // 相當(dāng)于 某個(gè)線程輪詢 int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos);}復(fù)制代碼
先是獲取這個(gè)index ,然后使用index % MessageQueue集合的大小獲得一個(gè)MessageQueue集合值的一個(gè)下標(biāo)(索引),這個(gè)index 其實(shí)某個(gè)線程內(nèi)自增1的,這樣就形成了某個(gè)線程內(nèi)輪詢的效果。這個(gè)樣子的話,同步發(fā)送其實(shí)就是單線程的輪詢,異步發(fā)送就是多個(gè)線程并發(fā)發(fā)送,然后某個(gè)線程內(nèi)輪詢,我們看下他這個(gè)單個(gè)線程自增1效果是怎樣實(shí)現(xiàn)的。
public class ThreadLocalIndex { private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>(); private final random random = new Random(); public int getAndIncrement() { Integer index = this.threadLocalIndex.get(); // 如果不存在就創(chuàng)建 然后設(shè)置到threadLocalIndex中 if (null == index) { index = Math.abs(random.nextInt()); this.threadLocalIndex.set(index); } index = Math.abs(index + 1); this.threadLocalIndex.set(index); return index; }}復(fù)制代碼
可以看到這個(gè)sendWhichQueue 是用ThreadLocal實(shí)現(xiàn)的,然后這個(gè)樣子就可以一個(gè)線程一個(gè)index,而且不會(huì)出現(xiàn)線程安全問題。
好了這里我們就把這個(gè)消息第一次發(fā)送時(shí)候MessageQueue看完了,然后我們?cè)賮砜聪滤渌卦嚨臅r(shí)候是怎樣選擇的,也就是lastBrokerName不是null的時(shí)候:
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { // 消息第一個(gè)發(fā)送的時(shí)候 還沒有重試 也沒有上一個(gè)brokerName if (lastBrokerName == null) { return selectOneMessageQueue(); } else { // 這個(gè) 出現(xiàn)在重試的時(shí)候 for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 避開 上次發(fā)送的brokerName if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } // todo 到最后 沒有避開 只能隨機(jī)選一個(gè) return selectOneMessageQueue(); }}復(fù)制代碼
這里其實(shí)就是選擇一個(gè)不是lastBrokerName 的MessageQueue,可以看到它是循環(huán) MessageQueue 集合大小數(shù)個(gè),這樣可能把所有的MessageQueue都看一遍,注意 這個(gè)循環(huán)只是起到選多少次的作用,具體的選擇還是要走某線程輪詢的那一套,到最后是在是選不出來了,也就是沒有這一堆MessageQueue都是在lastBrokerName上的,只能調(diào)用selectOneMessageQueue輪詢選一個(gè)了。
到這我們就把最普通的選擇一個(gè)MessageQueue介紹完了。
3.2 延遲故障的實(shí)現(xiàn)下面我們?cè)賮斫榻B下那個(gè)延遲故障的實(shí)現(xiàn),這個(gè)其實(shí)就是根據(jù)你這個(gè)broker 的響應(yīng)延遲時(shí)間的大小,來影響下次選擇這個(gè)broker的權(quán)重,他不是絕對(duì)的,因?yàn)楦鶕?jù)它這個(gè)規(guī)則是在找不出來的話,他就會(huì)使用那套普通選擇算法來找個(gè)MessageQueue。
它是這樣一個(gè)原理:
在每次發(fā)送之后都收集一下它這次的一個(gè)響應(yīng)延遲,比如我10點(diǎn)1分1秒200毫秒給broker-a了一個(gè)消息,然后到了10點(diǎn)1分1秒900毫秒的時(shí)候才收到broker-a 的一個(gè)sendResult也就是響應(yīng),這個(gè)時(shí)候他就是700ms的延遲,它會(huì)跟你就這個(gè)300ms的延遲找到一個(gè)時(shí)間范圍,他就認(rèn)為你這個(gè)broker-a 這個(gè)broker 在某個(gè)時(shí)間段內(nèi),比如說30s內(nèi)是不可用的。然后下次選擇的時(shí)候,他在第一輪會(huì)找那些可用的broker,找不到的話,就找那些上次不是這個(gè)broker的,還是找不到的話,他就絕望了,用最普通的方式,也就是上面說的那種輪詢算法找一個(gè)MessageQueue出來。接下來我們先來看下它的收集延遲的部分,是這個(gè)樣子的,還是在這個(gè)失敗重試?yán)锩?,然后它?huì)在響應(yīng)后或者異常后面都加一行代碼來收集這些延遲:
...// todo 進(jìn)行發(fā)送sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();// todo isolation 參數(shù)為false(看一下異常情況)this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);...復(fù)制代碼
這是正常響應(yīng)后的,注意它的isolation 參數(shù),也就是隔離 是false,在看下異常的
...catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue;}...復(fù)制代碼
他這個(gè)isolation 參數(shù)就是true ,也就是需要隔離的意思。
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { // todo this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);}復(fù)制代碼
可以看到是調(diào)用了mqFaultStrategy 的updateFaultItem 方法:
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { // 是否開啟延遲故障容錯(cuò) if (this.sendLatencyFaultEnable) { // todo 計(jì)算不可用持續(xù)時(shí)間 long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); // todo 存儲(chǔ) this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); }}復(fù)制代碼
先是判斷是否開啟了這個(gè)延遲故障的這么一個(gè)配置,默認(rèn)是不啟動(dòng)的,但是你可以自己?jiǎn)?dòng)set下就可以了setSendLatencyFaultEnable(true)
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("127.0.0.1:9876");producer.setSendLatencyFaultEnable(true);復(fù)制代碼
首先是計(jì)算這個(gè)它認(rèn)為broker不可用的這么一個(gè)時(shí)間,參數(shù)就是你那個(gè)響應(yīng)延遲,熔斷的話就配置30000毫秒, 否則的話就是正常的那個(gè)響應(yīng)時(shí)間
/** * 計(jì)算不可用持續(xù)時(shí)間 * @param currentLatency 當(dāng)前延遲 */private long computeNotAvailableDuration(final long currentLatency) { // latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; // notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; // 倒著遍歷 for (int i = latencyMax.length - 1; i >= 0; i--) { // 如果延遲大于某個(gè)時(shí)間,就返回對(duì)應(yīng)服務(wù)不可用時(shí)間,可以看出來,響應(yīng)延遲100ms以下是沒有問題的 if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0;}復(fù)制代碼
他這個(gè)計(jì)算規(guī)則是這個(gè)樣子的,他有兩個(gè)數(shù)組,一個(gè)是響應(yīng)延遲的,一個(gè)是不可使用的時(shí)間,兩個(gè)排列都是從小到大的順序,倒著先找響應(yīng)延遲,如果你這個(gè)延遲大于某個(gè)時(shí)間,就找對(duì)應(yīng)下標(biāo)的不可使用的時(shí)間,比如說響應(yīng)延遲700ms,這時(shí)候他就會(huì)找到30000ms不可使用時(shí)間。
計(jì)算完這個(gè)不可使用時(shí)間后接著調(diào)用了latencyFaultTolerance的updateFaultItem方法,這個(gè)方法其實(shí)就是用來存儲(chǔ)的:
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { // 從緩存中獲取 FaultItem old = this.faultItemTable.get(name); // 緩存沒有的情況 if (null == old) { final FaultItem faultItem = new FaultItem(name); // 設(shè)置延遲 faultItem.setCurrentLatency(currentLatency); // 設(shè)置啟用時(shí)間 faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); // 設(shè)置faultItemTable 中 old = this.faultItemTable.putIfAbsent(name, faultItem); // 如果已經(jīng)有了,拿到 老的進(jìn)行更新 if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { // 緩存中已經(jīng)有了,直接拿老的進(jìn)行更新 old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); }}復(fù)制代碼
他有個(gè)faultItemTable 這個(gè)緩存,記錄著 每個(gè)broker的FaultItem的項(xiàng),這個(gè)FaultItem就是保存它能夠使用的一個(gè)時(shí)間(當(dāng)前時(shí)間戳+不可使用時(shí)間),其實(shí)這個(gè)方法就是做更新或者插入操作。
好了到這我們就把它這個(gè)收集響應(yīng)延遲指標(biāo)與計(jì)算可用時(shí)間這快就解析完了,再回頭看下那個(gè)選擇MessageQueue的方法:
可以看到它先是找那種可用的,然后不是上一個(gè)broker的那個(gè),如果好幾輪下來沒有找到的話就選擇一個(gè)
public String pickOneAtLeast() { // 將map中里面的放到tmpList 中 final Enumeration<FaultItem> elements = this.faultItemTable.elements(); List<FaultItem> tmpList = new LinkedList<FaultItem>(); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } // 如果不是null if (!tmpList.isEmpty()) { // 洗牌算法 Collections.shuffle(tmpList); // 排序 Collections.sort(tmpList); final int half = tmpList.size() / 2; // 沒有 2臺(tái)機(jī)器 if (half <= 0) { // 選擇第一個(gè) return tmpList.get(0).getName(); } else { // 有2臺(tái)機(jī)器及以上,某個(gè)線程內(nèi)隨機(jī)選排在前半段的broker final int i = this.whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); } } return null;}復(fù)制代碼
先是排序,然后將所有的broker/2 ,如果是小于等于0的話,說明就2個(gè)broker以下,選第一個(gè),如果是2臺(tái)以上,就輪詢選一個(gè)
先來看下排序規(guī)則:
/** * 失敗條目(規(guī)避規(guī)則條目) */class FaultItem implements Comparable<FaultItem> { // 條目唯一鍵,這里是brokerName private final String name; // todo currentLatency 和startTimestamp 被volatile修飾 // 本次消息發(fā)送的延遲時(shí)間 private volatile long currentLatency; // 故障規(guī)避的開始時(shí)間 private volatile long startTimestamp; public FaultItem(final String name) { this.name = name; } @Override public int compareTo(final FaultItem other) { // 將能提供服務(wù)的放前面 if (this.isAvailable() != other.isAvailable()) { if (this.isAvailable()) return -1; if (other.isAvailable()) return 1; } // 找延遲低的 放前面 if (this.currentLatency < other.currentLatency) return -1; else if (this.currentLatency > other.currentLatency) { return 1; } // 找最近能提供服務(wù)的 放前面 if (this.startTimestamp < other.startTimestamp) return -1; else if (this.startTimestamp > other.startTimestamp) { return 1; } return 0; }復(fù)制代碼
它是把能提供服務(wù)的放前面,然后沒有,就找那種延遲低的放前面,也沒有的話就找最近能提供服務(wù)的放前頭。 找到這個(gè)broker 之后然后根據(jù)這個(gè)broker name 獲取寫隊(duì)列的個(gè)數(shù),其實(shí)你這個(gè)寫隊(duì)列個(gè)數(shù)有幾個(gè),然后你這個(gè)broker對(duì)應(yīng)的MessageQueue就有幾個(gè),如果write size >0的話,然后這個(gè)broker 不是null,就找一個(gè)mq,然后設(shè)置上它的broker name 與queue id
如果write<=0,直接移除這個(gè)broker對(duì)應(yīng)FaultItem,最后實(shí)在是找不到就按照上面那種普通方法來找了。
好了,到這我們延遲故障也介紹完成了。
原文鏈接:https://juejin.cn/post/7211072055780458533
以上就是關(guān)于領(lǐng)取pos機(jī)源碼,producer 容錯(cuò)機(jī)制的知識(shí),后面我們會(huì)繼續(xù)為大家整理關(guān)于領(lǐng)取pos機(jī)源碼的知識(shí),希望能夠幫助到大家!
