想聊一聊流量控制,談?wù)劦闹匾裕鉀Q了哪些業(yè)務(wù)問題,那我們問題來進(jìn)入正題。
1、WEB容器如何流量控制?
一個(gè)Tomcat的容器,這個(gè)容器呢,部署在一臺(tái)服務(wù)器上面,同時(shí)這臺(tái)服務(wù)器的資源非常非常有限,這臺(tái)服務(wù)器只能同時(shí)讓500個(gè)請(qǐng)求訪問,若是多余500個(gè)的話,這樣服務(wù)器的資源就會(huì)打滿,那么我們肯定需要想辦法這些問題的。Tomcat本身就有這樣的機(jī)制,因?yàn)槊恳粋€(gè)請(qǐng)求過來后,tomcat會(huì)為這個(gè)請(qǐng)求分配一個(gè)處理線程,所以tomcat就是來控制處理線程的數(shù)量。
server.xml
1 2 3 4 5 6 7 8 9 10 11 | <Connector executor= "tomcatThreadPool" port= "8080" protocol= "HTTP/1.1" connectionTimeout= "8000" enableLookups= "false" acceptorThreadCount= "1" URIEncoding= "utf-8" redirectPort= "443" compression= "on" compressionMinSize= "1024" compressableMimeType= "text/html,text/xml,text/javascript,text/css,text/plain,application/json,application/xml" /> <Executor className= "StandardThreadExecutor" name= "tomcatThreadPool" namePrefix= "catalina-exec-" maxThreads= "500" minSpareThreads= "100" /> |
maxThreads="500" 表示最多能同時(shí)并存500個(gè)處理線程。
acceptCount="500" 表示在500個(gè)處理線程在占用的情況中,還允許500個(gè)請(qǐng)求的排隊(duì)。
這兩個(gè)參數(shù)基本就是Tomcat在線程保護(hù)當(dāng)中的策略。
2、一個(gè)WEB容器里面如何進(jìn)行具體的業(yè)務(wù)模塊的線程保護(hù)呢?
一個(gè)業(yè)務(wù)系統(tǒng)部署在一個(gè)Tomcat中,例如這個(gè)業(yè)務(wù)系統(tǒng)有兩個(gè)重要模塊(A和B模塊),這個(gè)兩個(gè)模塊的請(qǐng)求都需要有資源處理,而不是那一個(gè)模塊把系統(tǒng)的資源都占用去,例如:A模塊限制最多300請(qǐng)求,B模塊最多300個(gè)請(qǐng)求。這樣場(chǎng)景的出現(xiàn)時(shí),我們就需要考慮說A模塊最多只能有300個(gè)處理線程,B也是這樣,那么Tomcat是可以保證資源層面的,A+B共有500個(gè),而無法確保A/B各300個(gè),所以有如下想法:
1、每個(gè)請(qǐng)求進(jìn)來確定是屬于A還是屬于B。
2、當(dāng)前正在運(yùn)行的A/B模塊的數(shù)量。
基于上面想法的具體實(shí)現(xiàn):
流量控制的業(yè)務(wù)實(shí)現(xiàn)(TrafficControl.java):
/** * 簡(jiǎn)單的實(shí)現(xiàn)基于URL的流控 */public class TrafficControl { //一個(gè)url請(qǐng)求的最大訪問數(shù)量為300 private final static int ONE_URI_MAX_CONCURRENT = 300;//所有url請(qǐng)求的最大訪問數(shù)量為500 private final static int ALL_URI_MAX_CONCURRENT = 500; private final static AtomicInteger all_url_concurrent = new AtomicInteger(0); private final static ConcurrentMap<String, AtomicInteger> url_concurrent_map = new ConcurrentHashMap<String, AtomicInteger>(); private final static SwitcherManager switcherManager = SwitcherManagerFactoryLoader.getSwitcherManagerFactory().getSwitcherManager(); private final static Switcher tcEnabled = switcherManager.registerSwitcher("feature.trackurl.traffic_control.enable", true); public static boolean isDisabled() { return tcEnabled.isClose(); } public static boolean isOverflow(String uri) { if (all_url_concurrent.get() > ALL_URI_MAX_CONCURRENT) { return true; } AtomicInteger one_url_concurrent = url_concurrent_map.get(uri); if (one_url_concurrent != null && one_url_concurrent.get() > ONE_URI_MAX_CONCURRENT) { return true; } return false; } public static void startAccess(String uri) { all_url_concurrent.incrementAndGet(); AtomicInteger one_url_concurrent = url_concurrent_map.get(uri); if (one_url_concurrent != null) { one_url_concurrent.incrementAndGet(); } else { url_concurrent_map.putIfAbsent(uri, new AtomicInteger(1)); } } public static void endAccess(String uri) { all_url_concurrent.decrementAndGet(); AtomicInteger one_url_concurrent = url_concurrent_map.get(uri); if (one_url_concurrent != null) { one_url_concurrent.decrementAndGet(); } } public static void dumpWarnLog() { String lineSeparator = System.getProperty("line.separator"); // 估算每一個(gè)URL和其計(jì)數(shù)占用32個(gè)字符 StringBuilder sb = new StringBuilder((1 + url_concurrent_map.size()) * 32); sb.append("all_url_concurrent : ").append(all_url_concurrent); for (Map.Entry<String, AtomicInteger> entry : url_concurrent_map.entrySet()) { sb.append(lineSeparator).append('\t').append(entry.getKey()).append(" : ").append(entry.getValue()); } ApiLogger.warn(sb); } }
Servlet的實(shí)現(xiàn)(TrafficControlServlet.java):
/** * 帶有流量控制的Servlet */public class TrafficControlServlet extends HttpServlet { @Override protected final void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { if (TrafficControl.isDisabled()) { super.service(req, resp); return; } String uri = req.getRequestURI(); if (TrafficControl.isOverflow(uri)) { TrafficControl.dumpWarnLog(); resp.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE); return; } try { TrafficControl.startAccess(uri); super.service(req, resp); } finally { TrafficControl.endAccess(uri); } } }
最后執(zhí)行Servlet如下,繼承于TrafficControlServlet。
public class TestServlet extends TrafficControlServlet { private static final long serialVersionUID = 2895590140869067830L; @Override protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws IOException { //................. } @Override protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws IOException { doGet(request, response); }
http://www.cnblogs.com/ficohu/p/6819292.html