網上有很多關于pos機企業(yè)源碼,Redis源碼分析的知識,也有很多人為大家解答關于pos機企業(yè)源碼的問題,今天pos機之家(www.tjfsxbj.com)為大家整理了關于這方面的知識,讓我們一起來看下吧!
本文目錄一覽:
pos機企業(yè)源碼
前言前言宏觀梳理啟動過程網絡層協議層業(yè)務層在保存到dict 的過程中,數據的形態(tài)也一直在變化定義新的數據類型小結參考《Apache Kafka源碼分析》——server服務端網絡開發(fā)的基本套路
宏觀梳理整個軸線是RedisServer 初始化并啟動eventloop, eventLoop 創(chuàng)建redisClient 及驅動processCommand 方法進而 執(zhí)行redisCommand 向 dict 中保存數據
本文 以一個SET KEY VALUE 來分析redis的 啟動和保存流程
啟動過程REDIS.c
int main(int argc, char **argv) {...// 初始化服務器initServerConfig();...// 將服務器設置為守護進程if (server.daemonize) daemonize();// 創(chuàng)建并初始化服務器數據結構initServer();...// 運行事件處理器,一直到服務器關閉為止aeSetBeforeSleepProc(server.el,beforeSleep);aeMain(server.el);// 服務器關閉,停止事件循環(huán)aeDeleteEventLoop(server.el);return 0}網絡層
Redis的網絡監(jiān)聽沒有采用libevent等,而是自己實現了一套簡單的機遇event驅動的API,具體見ae.c。事件處理器的主循環(huán)
void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {// 如果有需要在事件處理前執(zhí)行的函數,那么運行它if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);// 開始處理事件aeProcessEvents(eventLoop, AE_ALL_EVENTS);}}
Redis 中的事件循環(huán)
int aeProcessEvents(aeEventLoop *eventLoop, int flags){struct timeval tv, *tvp;... // 獲取最近的時間事件 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // 如果時間事件存在的話,那么根據最近可執(zhí)行時間事件和現在時間的時間差來決定文件事件的阻塞時間 // 計算距今最近的時間事件還要多久才能達到,并將該時間距保存在 tv 結構中 aeGetTime(&now_sec, &now_ms); } else { // 執(zhí)行到這一步,說明沒有時間事件,那么根據 AE_DONT_WAIT 是否設置來決定是否阻塞,以及阻塞的時間長度 } // 處理文件事件,阻塞時間由 tvp 決定// 類似于 java nio 中的select numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { // 從已就緒數組中獲取事件 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; // 讀事件 if (fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 寫事件 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } }// 執(zhí)行時間事件if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);}
這個event loop的邏輯可不孤單,netty中也有類似的EventLoop 中的 Loop 到底是什么?
Redis 中會處理兩種事件:時間事件和文件事件。在每個事件循環(huán)中 Redis 都會先處理文件事件,然后再處理時間事件直到整個循環(huán)停止。 aeApiPoll 可看做文件事件的生產者(還有一部分文件事件來自accept等),processEvents 和 processTimeEvents 作為 Redis 中發(fā)生事件的消費者,每次都會從“事件池”(aeEventLoop的幾個列表字段)中拉去待處理的事件進行消費。
關于C/C++ Linux后端開發(fā)網絡底層原理知識 點擊 正在跳轉 獲取,內容知識點包括Linux,Nginx,ZeroMQ,MySQL,Redis,線程池,MongoDB,ZK,Linux內核,CDN,P2P,epoll,Docker,Tcp/IP,協程,DPDK等等。
協議層Redis 通信協議
我們以讀事件為例,但發(fā)現數據可讀時,執(zhí)行了 fe->rfileProc(eventLoop,fd,fe->clientData,mask);,那么rfileProc 的執(zhí)行邏輯是啥呢?
initServer ==> aeCreateFileEvent. 初始化server 時,創(chuàng)建aeCreateFileEvent(aeFileEvent的一種),當accept (可讀事件的一種)就緒時,觸發(fā)aeCreateFileEvent->rfileProc 方法 也就是 acceptTcpHandler// redis.c void initServer() { ... // 為 TCP 連接關聯連接應答(accept)處理器,用于接受并應答客戶端的 connect() 調用 for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL) == AE_ERR){...} } ... }創(chuàng)建客戶端,并綁定讀事件到loop:acceptTcpHandler ==> createClient ==> aeCreateFileEvent ==> readQueryFromClient
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; ... while(max--) { // accept 客戶端連接 cfd = anetTcpAccept(server.neterr, fd, cip, sizEOF(cip), &cport); if (cfd == ANET_ERR) { ... return; } // 為客戶端創(chuàng)建客戶端狀態(tài)(redisClient) acceptCommonHandler(cfd,0); } } static void acceptCommonHandler(int fd, int flags) { // 創(chuàng)建客戶端 redisClient *c; if ((c = createClient(fd)) == NULL) { ... close(fd); /* May be already closed, just ignore errors */ return; } // 如果新添加的客戶端令服務器的最大客戶端數量達到了,那么向新客戶端寫入錯誤信息,并關閉新客戶端 // 先創(chuàng)建客戶端,再進行數量檢查是為了方便地進行錯誤信息寫入 ... } redisClient *createClient(int fd) { // 分配空間 redisClient *c = zmalloc(sizeof(redisClient)); if (fd != -1) { ... //綁定讀事件到事件 loop (開始接收命令請求) if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR){ // 清理/關閉資源退出 } } // 初始化redisClient其它數據 }拼接和分發(fā)命令數據 readQueryFromClient ==> processInputBuffer ==> processCommand
networking.c void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = (redisClient*) privdata; // 獲取查詢緩沖區(qū)當前內容的長度 // 如果讀取出現 short read ,那么可能會有內容滯留在讀取緩沖區(qū)里面 // 這些滯留內容也許不能完整構成一個符合協議的命令, qblen = sdslen(c->querybuf); // 如果有需要,更新緩沖區(qū)內容長度的峰值(peak) if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; // 為查詢緩沖區(qū)分配空間 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // 讀入內容到查詢緩存 nread = read(fd, c->querybuf+qblen, readlen); // 讀入出錯 // 遇到 EOF if (nread) { // 根據內容,更新查詢緩沖區(qū)(sds) free 和 len 屬性 // 并將 '\\0' 正確地放到內容的最后 sdsIncrLen(c->querybuf,nread); // 記錄服務器和客戶端最后一次互動的時間 c->lastinteraction = server.unixtime; // 如果客戶端是 master 的話,更新它的復制偏移量 if (c->flags & REDIS_MASTER) c->reploff += nread; } else { // 在 nread == -1 且 errno == EAGAIN 時運行 server.current_client = NULL; return; } // 查詢緩沖區(qū)長度超出服務器最大緩沖區(qū)長度 // 清空緩沖區(qū)并釋放客戶端 // 從查詢緩存重讀取內容,創(chuàng)建參數,并執(zhí)行命令 // 函數會執(zhí)行到緩存中的所有內容都被處理完為止 processInputBuffer(c); server.current_client = NULL; } // 處理客戶端輸入的命令內容 void processInputBuffer(redisClient *c) { // 盡可能地處理查詢緩沖區(qū)中的內容 while(sdslen(c->querybuf)) { ... // 判斷請求的類型 // 簡單來說,多條查詢是一般客戶端發(fā)送來的, // 而內聯查詢則是 TELNET 發(fā)送來的 if (!c->reqtype) { if (c->querybuf[0] == '*') { // 多條查詢 c->reqtype = REDIS_REQ_MULTIBULK; } else { // 內聯查詢 c->reqtype = REDIS_REQ_INLINE; } } // 將緩沖區(qū)中的內容轉換成命令,以及命令參數 if (c->reqtype == REDIS_REQ_INLINE) { if (processInlineBuffer(c) != REDIS_OK) break; } else if (c->reqtype == REDIS_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != REDIS_OK) break; } else { redisPanic("Unknown request type"); } ... } } redis.c int processCommand(redisClient *c) { // 特別處理 quit 命令 // 查找命令,并進行命令合法性檢查,以及命令參數個數檢查 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); // 沒找到指定的命令 或 參數個數錯誤 直接返回 // 檢查認證信息 // 如果開啟了集群模式,那么在這里進行轉向操作。 // 如果設置了最大內存,那么檢查內存是否超過限制,并做相應的操作 // 如果這是一個主服務器,并且這個服務器之前執(zhí)行 BGSAVE 時發(fā)生了錯誤 // 那么不執(zhí)行寫命令 // 如果服務器沒有足夠多的狀態(tài)良好服務器 // 并且 min-slaves-to-write 選項已打開 // 如果這個服務器是一個只讀 slave 的話,那么拒絕執(zhí)行寫命令 // 在訂閱于發(fā)布模式的上下文中,只能執(zhí)行訂閱和退訂相關的命令 /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and * we are a slave with a broken link with master. */ // 如果服務器正在載入數據到數據庫,那么只執(zhí)行帶有 REDIS_CMD_LOADING // 標識的命令,否則將出錯 /* Lua script too slow? Only allow a limited number of commands. */ // Lua 腳本超時,只允許執(zhí)行限定的操作,比如 SHUTDOWN 和 SCRIPT KILL /* Exec the command */ if (c->flags & REDIS_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { // 在事務上下文中除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外 // 其他所有命令都會被入隊到事務隊列中 queueMultiCommand(c); addReply(c,shared.queued); } else { // 執(zhí)行命令 call(c,REDIS_CALL_FULL); c->woff = server.master_repl_offset; // 處理那些解除了阻塞的鍵 if (listLength(server.ready_keys)) handleClientsBlockedOnLists(); } return REDIS_OK; }業(yè)務層
redis.c// 調用命令的實現函數,執(zhí)行命令void call(redisClient *c, int flags) {// start 記錄命令開始執(zhí)行的時間// 記錄命令開始執(zhí)行前的 FLAG// 如果可以的話,將命令發(fā)送到 MONITOR/* Call the command. */c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);redisOpArrayInit(&server.also_propagate);// 保留舊 dirty 計數器值dirty = server.dirty;// 計算命令開始執(zhí)行的時間start = ustime();// 執(zhí)行實現函數c->cmd->proc(c);// 計算命令執(zhí)行耗費的時間duration = ustime()-start;// 計算命令執(zhí)行之后的 dirty 值dirty = server.dirty-dirty;...// 如果有需要,將命令放到 SLOWLOG 里面// 更新命令的統(tǒng)計信息...server.stat_numcommands++;}redis.cstruct redisCommand redisCommandTable[] = {{"get",getCommand,2,"r",0,NULL,1,1,1,0,0},{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},...}
t_string.c/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */void setCommand(redisClient *c) {int j;robj *expire = NULL;int unit = UNIT_SECONDS;int flags = REDIS_SET_NO_flags;// 設置選項參數// 嘗試對值對象進行編碼c->argv[2] = tryObjectEncoding(c->argv[2]);setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);}void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {long long milliseconds = 0; /* initialized to avoid any harmness warning */// 取出過期時間// 如果設置了 NX 或者 XX 參數,那么檢查條件是否不符合這兩個設置// 在條件不符合時報錯,報錯的內容由 abort_reply 參數決定// 將鍵值關聯到數據庫setKey(c->db,key,val);// 將數據庫設為臟// 為鍵設置過期時間if (expire) setExpire(c->db,key,mstime()+milliseconds);// 發(fā)送事件通知// 設置成功,向客戶端發(fā)送回復}db.cvoid setKey(redisDb *db, robj *key, robj *val) {// 添加或覆寫數據庫中的鍵值對if (lookupKeyWrite(db,key) == NULL) {dbAdd(db,key,val);} else {dbOverwrite(db,key,val);}incrRefCount(val);// 移除鍵的過期時間removeExpire(db,key);// 發(fā)送鍵修改通知signalModifiedKey(db,key);}
前面說過, 命令實現函數會將命令回復保存到客戶端的輸出緩沖區(qū)里面, 并為客戶端的套接字關聯命令回復處理器, 當客戶端套接字變?yōu)榭蓪憼顟B(tài)時, 服務器就會執(zhí)行命令回復處理器, 將保存在客戶端輸出緩沖區(qū)中的命令回復發(fā)送給客戶端。
當命令回復發(fā)送完畢之后, 回復處理器會清空客戶端狀態(tài)的輸出緩沖區(qū), 為處理下一個命令請求做好準備。
在保存到dict 的過程中,數據的形態(tài)也一直在變化相關的數據結構
struct redisClient {// 查詢緩沖區(qū)sds querybuf;// 參數數量int argc;// 參數對象數組robj **argv;}typedef struct redisObject {// 類型unsigned type:4;// 編碼unsigned encoding:4;// 對象最后一次被訪問的時間unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */// 引用計數int refcount;// 指向實際值的指針void *ptr;} robj;
轉換的代碼
networking.c// 將 c->querybuf 中的協議內容轉換成 c->argv 中的參數對象int processMultibulkBuffer(redisClient *c) {// 讀入命令的參數個數// 比如 *3\\$3\\SET\\... 將令 c->multibulklen = 3if (c->multibulklen == 0) {// 檢查緩沖區(qū)的內容第一個 "\\"newline = strchr(c->querybuf,'\');if (newline == NULL) {...return REDIS_ERR;}// 協議的第一個字符必須是 '*'// 將參數個數,也即是 * 之后, \\ 之前的數字取出并保存到 ll 中// 比如對于 *3\\ ,那么 ll 將等于 3ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);// 參數的數量超出限制// 設置參數數量// 根據參數數量,為各個參數對象分配空間if (c->argv) zfree(c->argv);c->argv = zmalloc(sizeof(robj*)*c->multibulklen);}// 從 c->querybuf 中讀入參數,并創(chuàng)建各個參數對象到 c->argvwhile(c->multibulklen) {// 讀入參數長度if (c->bulklen == -1) {// 確保 "\\" 存在// 確保協議符合參數格式,檢查其中的 $...// 讀取長度// 比如 $3\\SET\\ 將會讓 ll 的值設置 3ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);...// 參數的長度c->bulklen = ll;}// 讀入參數// 確保內容符合協議格式// 為參數創(chuàng)建字符串對象 if (pos == 0 &&c->bulklen >= REDIS_MBULK_BIG_ARG &&(signed) sdslen(c->querybuf) == c->bulklen+2){c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf);sdsIncrLen(c->querybuf,-2); /* remove CRLF */c->querybuf = sdsempty();/* Assume that if we saw a fat argument we'll see another one* likely... */c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2);pos = 0;} else {c->argv[c->argc++] =createStringObject(c->querybuf+pos,c->bulklen);pos += c->bulklen+2;}// 清空參數長度// 減少還需讀入的參數個數c->multibulklen--; }// 從 querybuf 中刪除已被讀取的內容// 如果本條命令的所有參數都已讀取完,那么返回// 如果還有參數未讀取完,那么就協議內容有錯}
object.c
robj *createObject(int type, void *ptr) {robj *o = zmalloc(sizeof(*o));o->type = type;o->encoding = REDIS_ENCODING_RAW;o->ptr = ptr;o->refcount = 1;/* Set the LRU to the current lruclock (minutes resolution). */o->lru = LRU_CLOCK();return o;}最開始命令數據在redisClient->querybuf 中以字符串形式存在processMultibulkBuffer 然后字符串 數據被拆分為 redisObject 保存在 redisClient->argv[1],redisClient->argv[2],當然redisObject 的類型仍被標記為字符串t_string.c setCommand 對值對象進行編碼到db.c 時,setKey(robj *key,robj *val)dict.c dictAdd(void *key, void *val) key 已被轉換為 sds。定義新的數據類型
來自 《Redis核心技術與實現》
定義新數據類型的底層結構,可以自己創(chuàng)建和命名.h 和 .c 文件在 RedisObject 的 type 屬性中,增加這個新類型的定義。在 Redis 的 server.h 文件中開發(fā)新類型的創(chuàng)建和釋放函數。主要是用 zmalloc 做底層結構分配空間。開發(fā)新類型的命令操作。在 server.c 文件中的 redisCommandTable 里面,把新增命令和實現函數關聯起來。小結如果你看到一個新東西,卻沒有理清它的邏輯,直到打通你已熟悉的東西(學名叫已有的知識體系),那肯定是沒有真正理解它。 在redis 源碼分析這里,你已知的是各種內存操作(即業(yè)務層部分),未知的網絡層到業(yè)務層的通路。
以上就是關于pos機企業(yè)源碼,Redis源碼分析的知識,后面我們會繼續(xù)為大家整理關于pos機企業(yè)源碼的知識,希望能夠幫助到大家!
