redis集群三種方式 redis線程池作用
Redis 6引入多線程IO,下面我們來和 Netty 的多線程模型進行對比分析思路:
- 初始化線程?
- 如何分配client給thread?
- 如何處理讀寫事件,在什么線程處理?
- 如何處理命令的邏輯,在什么線程處理?

文章插圖
用戶代碼
ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY, true).childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue").handler(new ServerHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new AuthHandler());//..}});ChannelFuture f = b.bind(8888).sync();f.channel().closeFuture().sync();復制代碼初始化線程( ServerBootsrap.bind())netty初始化線程,創建一個 boss線程池,一個work線程池,并且給new了一個channel處理注冊線程的連接,并且為這個channel 添加了一個 ServerBootstrapAcceptor的channel 。- 操作線程: 主線程執行
- 執行時機: 初始化線程
- 執行代碼: ServerBootsrap.bind()
- 操作線程 : 主線程執行
- 執行時機 : 新連接接入
BOSS線程組的 NioEventLoop.run() 不斷檢查所有的管道,當管道狀態為可讀或者連接的時候就會讀取管道 。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}復制代碼然后就按 Channel 的責任鏈傳遞下去- unsafe.read()
- —->
- pipeline.fireChannelRead(byteBuf);
- —->
- ServerBootstrapAcceptor.channelRead()
- —->
- MultithreadEventLoopGroup.register(child) 分配一個線程給這個channel,一個線程可能擁有多個channel
DefaultEventExecutorChooserFactory.java 用線程個數取余來分配@Overridepublic EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];}AbstractChannel.java@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {// 重點!!! 這個線程就永遠被掛靠在channel上面了AbstractChannel.this.eventLoop = eventLoop;// 監聽讀事件 NIO底層的注冊register0(promise);}}復制代碼如何處理讀寫事件,在什么線程處理?ChannelInboundHandler.channelRead如何處理命令的邏輯,在什么線程處理?ChannelInboundHandler.channelRead
總結:Netty通過開始注冊一個Boss線程池(通常情況都是一個),來監聽(NioEventLoop,run)連接的channel,如果有channel來進行連接,就通過責任鏈找到ServerBootstrapAcceptor.channelRead() 分配給channel一個線程(NioEventLoop),這個線程(NioEventLoop)就會通過run()去不斷的 去讀Channel里面的,處理命令 。
Redis的多線程模型

文章插圖
初始化線程( initThreadedIO() 函數)
- 操作線程 :主線程執行
- 執行時機 :初始化線程
創建io_threads_num個線程(listCreate),并且對除主線程(id==0)以外的線程進行處理: (listCreate 創建一個線程)
- 初始化線程的等待任務為0
- 獲取鎖,使得線程不能進行操作
- 將線程tid與Redis中的線程id進行映射
/* Initialize the data structures needed for threaded I/O. */void initThreadedIO(void) {io_threads_active = 0; /* We start with threads not active. *//* Don't spawn any thread if the user selected a single thread:* we'll handle I/O directly from the main thread. */// 如果用戶沒有開啟多線程IO直接返回 使用主線程處理if (server.io_threads_num == 1) return;// 線程數設置超過上限if (server.io_threads_num > IO_THREADS_MAX_NUM) {serverLog(LL_WARNING,"Fatal: too many I/O threads configured. ""The maximum number is %d.", IO_THREADS_MAX_NUM);exit(1);}/* Spawn and initialize the I/O threads. */// 初始化io_threads_num個對應線程for (int i = 0; i < server.io_threads_num; i++) {/* Things we do for all the threads including the main thread. */io_threads_list[i] = listCreate();if (i == 0) continue; // Index 0為主線程,跳過/* Things we do only for the additional threads. */// 非主線程則需要以下處理pthread_t tid;// 為線程初始化生成對應的鎖pthread_mutex_init(&io_threads_mutex[i],NULL);// 線程等待狀態初始化為0io_threads_pending[i] = 0;// 初始化后將線程鎖住pthread_mutex_lock(&io_threads_mutex[i]);if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");exit(1);}// 將index和對應線程ID加以映射io_threads[i] = tid;}}復制代碼讀事件到來(readQueryFromClient)- 操作線程 :主線程執行
- 機制時機 :讀事件到來
// 讀取到一個客戶端的請求int postponeClientRead(client *c) {if (io_threads_active && // 線程是否在不斷(spining)等待IOserver.io_threads_do_reads && // 是否多線程IO讀取!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))){//client不能是主從,且未處于等待讀取的狀態// 將Client設置為等待讀取的狀態Flagc->flags |= CLIENT_PENDING_READ;// 把client加入到等待讀列表listAddNodeHead(server.clients_pending_read,c);return 1;} else {return 0;}}復制代碼這時server維護了一個 clients_pending_read,包含所有的讀事件 pending的客戶端列表 。如何分配client給thread(線程) (handleClientsWithPendingReadsUsingThreads)
- 操作線程 :主線程執行
- 執行時機 :執行處理事件之后
如果是長度不為0,進行while循環,將每個等待的client分配給線程,當等待長度超過線程時,每個線程分配給到的client可能超過1個;
int item_id = 0;while((ln = listNext(&li))) {client *c = listNodeValue(ln);// 在線程組取余int target_id = item_id % server.io_threads_num;listAddNodeTail(io_threads_list[target_id],c);item_id++;}并且修改每個線程需要完成的數量(初始化為0):// 所有線程for (int j = 1; j < server.io_threads_num; j++) {// 拿出當前線程需要處理多少個客戶端int count = listLength(io_threads_list[j]);// 設置當前線程需要多少客戶端io_threads_pending[j] = count;}等待處理直到沒有剩余任務:while(1) {unsigned long pending = 0;// 拿出所有線程,查看線程是否還有需要的客戶端// 這里主要是監聽子線程是否完全處理好任務for (int j = 1; j < server.io_threads_num; j++)pending += io_threads_pending[j];if (pending == 0) break;}當本次IO的所有(讀/寫)線程處理完畢之后,清空client_pending_read:主線程會在這里處理命令listRewind(server.clients_pending_read,&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);c->flags &= ~CLIENT_PENDING_READ;if (c->flags & CLIENT_PENDING_COMMAND) {c->flags &= ~ CLIENT_PENDING_COMMAND;processCommandAndResetClient(c);}processInputBufferAndReplicate(c);}listEmpty(server.clients_pending_read);復制代碼如何處理讀請求(IOThreadMain)- 操作線程 :子線程
- 執行時機 : 子線程啟動時 while執行
Redis為每個客戶端分配了輸入緩沖區,它的作用是將客戶端發送的命令臨時保存,同時Redis從會輸入緩沖區拉取命令并執行,輸入緩沖區為客戶端發送命令到Redis執行命令提供了緩沖功能 。
Redis的 Thread IO 模型中,每次所有的線程都只能進行或者 寫/讀 操作,通過 io_threads_op控制 。同時每個線程負責的client一次執行:
// io thread主函數,在各個子線程執行void *IOThreadMain(void *myid) {// 線程 ID,跟普通線程池的操作方式一樣,都是通過 線程ID 進行操作long id = (unsigned long)myid;while(1) {/**這里的等待操作比較特殊,沒有使用簡單的 sleep,*避免了 sleep 時間設置不當可能導致糟糕的性能,*但是也有個問題就是頻繁 loop 可能一定程度上造成 cpu 占用較長*/for (int j = 0; j < 1000000; j++) {if (io_threads_pending[id] != 0) break;}// 根據線程 id 以及待分配列表進行 任務分配listIter li;listNode *ln;listRewind(io_threads_list[id],&li);// 有可能分配了兩個客戶端連接while((ln = listNext(&li))) {client *c = listNodeValue(ln);if (io_threads_op == IO_THREADS_OP_WRITE) {// 當前全局處于寫事件時,向輸出緩沖區寫入響應內容writeToClient(c,0);} else if (io_threads_op == IO_THREADS_OP_READ) {// 當前全局處于讀事件時,從輸入緩沖區讀取請求內容readQueryFromClient(c->conn);} else {serverPanic("io_threads_op value is unknown");}}listEmpty(io_threads_list[id]);io_threads_pending[id] = 0;if (tio_debug) printf("[%ld] Donen", id);}復制代碼readQeuryFromClient()->processInputBuffer(c)->processCommand() 進行command的分發和處理 。這里的readQueueFromClient只做寫入客戶端的輸入緩存區:
// 復制到 Client 緩存區else if (c->flags & CLIENT_MASTER) {c->pending_querybuf = sdscatlen(c->pending_querybuf,c->querybuf+qblen,nread);}void processInputBuffer(client *c) {while(c->qb_pos < sdslen(c->querybuf)) {// 如果我們在 IO線程(子線程)的時候// 我們不能直接執行命令,flags設置為CLIENT_PENDING_COMMAND// 然后讓主線程執行if (c->flags & CLIENT_PENDING_READ) {c->flags |= CLIENT_PENDING_COMMAND;break;}}}復制代碼每個線程執行readQueryFromClient,將對應的請求放入一個隊列,單線程執行(從輸入緩存區讀取內容),線程將結果寫入客戶端的buff中 。每輪處理中,需要將各個線程的鎖開啟,并且將相關標志位:
void startThreadedIO(void) {if (tio_debug) { printf("S"); fflush(stdout); }if (tio_debug) printf("--- STARTING THREADED IO ---n");serverAssert(io_threads_active == 0);for (int j = 1; j < server.io_threads_num; j++)// 解開線程的鎖定狀態pthread_mutex_unlock(&io_threads_mutex[j]);// 現在可以開始多線程IO執行對應讀/寫任務io_threads_active = 1;}復制代碼結束時,首先檢查是否有是否有待讀的IO,如果沒有,將線程說的,標志關閉:void stopThreadedIO(void) {// 需要停止的時候可能還有等待讀的Client 在停止前進行處理handleClientsWithPendingReadsUsingThreads();if (tio_debug) { printf("E"); fflush(stdout); }if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---n",(int) listLength(server.clients_pending_read),(int) listLength(server.clients_pending_write));serverAssert(io_threads_active == 1);for (int j = 1; j < server.io_threads_num; j++)// 本輪IO結束 將所有線程上鎖pthread_mutex_lock(&io_threads_mutex[j]);// IO狀態設置為關閉io_threads_active = 0;}復制代碼總結:Threaded IO將服務讀Client的輸入緩存區和將執行結果寫入輸出緩沖區的過程改為了多線程模型,同時保持同一時間全部線程均處于讀或寫的狀態 。但是命令的具體執行以單線程(隊列)的形式 。因為Redis希望保持堅定結果避免鎖和競爭問題,并且讀寫緩存占用命令執行聲明周期的比重比大 ,處理這部分IO模型給性能帶來來顯著的提升 。Netty 和 Redis6 區別:如何分配client給thread?netty:當Boss監聽到連接事件,netty會給一個channel分配一個線程 。這個線程專門負責這條channel的讀寫事件,可以是解析也可以是執行命令
redis6:每當接收到一個讀事件,Client放到等待讀取的隊列 。在執行處理事件之后,主線程會 統一給線程池的線程分配client 。線程把client要讀buffer都放到client的緩存 。主線程等待所有 io線程執行完畢,主線程再執行client的緩存變成命令
如何處理讀寫事件,在什么線程處理?netty:在子線程執行,讀寫完直接執行邏輯 redis6:在子線程執行,讀寫完放在client的緩沖區
如何處理命令的邏輯,在什么線程處理?【redis集群三種方式 redis線程池作用】netty:在子線程執行,直接執行邏輯 redis6:在主線程執行,主線程遍歷等待隊列讀取緩沖區編譯成命令再執行
為什么redis選擇使用這個模式?

文章插圖
推薦閱讀
- 電腦截屏的三種方法 mac電腦截屏怎么操作
- 電瓶怎么看生產日期 三種看電瓶生產日期的方法
- 駕駛證注銷如何恢復 三種方法介紹
- 企業三種經營模式 商業模式的類型包括哪四種
- 什么是神仙水
- 拓撲關系有哪三種類型
- 驅鬼的三種方法 驅鬼方法有哪些?
- Linux虛擬機三種網絡配置 linux虛擬機網絡設置
- 怎么在迷你世界獲得皮膚 三種方法任你挑
- 王者榮耀新英雄夏洛特怎么獲得 這三種方式都是可以的
