pos機架

 新聞資訊3  |   2023-09-07 09:37  |  投稿人:pos機之家

網(wǎng)上有很多關(guān)于pos機架,帶你HDFS讀文件過程分析的知識,也有很多人為大家解答關(guān)于pos機架的問題,今天pos機之家(www.tjfsxbj.com)為大家整理了關(guān)于這方面的知識,讓我們一起來看下吧!

本文目錄一覽:

1、pos機架

pos機架

前言

我們可以從java.io.InputStream類中看到,抽象出一個read方法,用來讀取已經(jīng)打開的InputStream實例中的字節(jié),每次調(diào)用read方法,會讀取一個字節(jié)數(shù)據(jù),該方法抽象定義,如下所示:public abstract int read() throws IOException;Hadoop的DFSClient.DFSInputStream類實現(xiàn)了該抽象邏輯,如果我們清楚了如何從HDFS中讀取一個文件的一個block的一個字節(jié)的原理,更加抽象的頂層只需要迭代即可獲取到該文件的全部數(shù)據(jù)。從HDFS讀文件過程分析:獲取文件對應(yīng)的Block列表中,我們已經(jīng)獲取到一個文件對應(yīng)的Block列表信息,打開一個文件,接下來就要讀取實際的物理塊數(shù)據(jù),我們從下面的幾個方面來詳細(xì)說明讀取數(shù)據(jù)的過程。

Client從Datanode讀取文件的一個字節(jié)

下面,我們通過分析DFSClient.DFSInputStream中實現(xiàn)的代碼,讀取HDFS上文件的內(nèi)容。首先從下面的方法開始:

@Overridepublic synchronized int read() throws IOException {int ret = read( oneByteBuf, 0, 1 );return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);}

上面調(diào)用read(oneByteBuf, 0, 1)讀取一個字節(jié)到單字節(jié)緩沖區(qū)oneByteBuf中,具體實現(xiàn)見如下方法:

@Overridepublic synchronized int read(byte buf[], int off, int len) throws IOException {checkOpen(); // 檢查Client是否正在運行if (closed) {throw new IOException("Stream closed");}failures = 0;if (pos < getFileLength()) { // getFileLength()獲取文件所包含的總字節(jié)數(shù),pos表示讀取當(dāng)前文件的第(pos+1)個字節(jié)int retries = 2;while (retries > 0) {try {if (pos > blockEnd) { // blockEnd表示文件的長度(字節(jié)數(shù))currentNode = blockSeekTo(pos); // 找到第pos個字節(jié)數(shù)據(jù)所在的Datanode(實際根據(jù)該字節(jié)數(shù)據(jù)所在的block元數(shù)據(jù)來定位)}int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L));int result = readBuffer(buf, off, realLen); // 讀取一個字節(jié)到緩沖區(qū)中if (result >= 0) {pos += result; // 每成功讀取result個字節(jié),pos增加result} else {// got a EOS from reader though we expect more data on it.throw new IOException("Unexpected EOS from the reader");}if (stats != null && result != -1) {stats.incrementBytesRead(result);}return result;} catch (ChecksumException ce) {throw ce;} catch (IOException e) {if (retries == 1) {LOG.warn("DFS Read: " + StringUtils.stringifyException(e));}blockEnd = -1;if (currentNode != null) { addToDeadNodes(currentNode); }if (--retries == 0) {throw e;}}}}return -1;}

讀取文件數(shù)據(jù)的一個字節(jié),具體過程如下:

檢查流對象是否處于打開狀態(tài)(前面已經(jīng)獲取到文件對應(yīng)的block列表的元數(shù)據(jù),并打開一個InputStream對象)從文件的第一個block開始讀取,首先需要找到第一個block對應(yīng)的數(shù)據(jù)塊所在的Datanode,可以從緩存的block列表中查詢到(如果查找不到,則會與Namenode進(jìn)行一次RPC通信請求獲取到)打開一個到該讀取的block所在Datanode節(jié)點的流,準(zhǔn)備讀取block數(shù)據(jù)建立了到Datanode的連接后,讀取一個字節(jié)數(shù)據(jù)到字節(jié)緩沖區(qū)中,返回讀取的字節(jié)數(shù)(1個字節(jié))

在讀取的過程中,以字節(jié)為單位,通過判斷某個偏移位置的字節(jié)屬于哪個block(根據(jù)block元數(shù)據(jù)所限定的字節(jié)偏移范圍),在根據(jù)這個block去定位某一個Datanode節(jié)點,這樣就可連續(xù)地讀取一個文件的全部數(shù)據(jù)(組成文件的、連續(xù)的多個block數(shù)據(jù)塊)。

查找待讀取的一個字節(jié)所在的Datanode節(jié)點

上面public synchronized int read(byte buf[], int off, int len) throws IOException方法,調(diào)用了blockSeekTo方法來獲取,文件某個字節(jié)索引位置的數(shù)據(jù)所在的Datanode節(jié)點。其實,很容易就能想到,想要獲取到數(shù)據(jù)所在的Datanode節(jié)點,一定是從block元數(shù)據(jù)中計算得到,然后根據(jù)Client緩存的block映射列表,找到block對應(yīng)的Datanode列表,我們看一下blockSeekTo方法的代碼實現(xiàn):

private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {... ...DatanodeInfo chosenNode = null;int refetchToken = 1; // only need to get a new access token oncewhile (true) {LocatedBlock targetBlock = getBlockAt(target, true); // 獲取字節(jié)偏移位置為target的字節(jié)數(shù)據(jù)所在的block元數(shù)據(jù)對象assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;long offsetIntoBlock = target - targetBlock.getStartOffset();DNAddrPair retval = chooseDataNode(targetBlock); // 選擇一個Datanode去讀取數(shù)據(jù)chosenNode = retval.info;InetSocketAddress targetAddr = retval.addr;// 先嘗試從本地讀取數(shù)據(jù),如果數(shù)據(jù)不在本地,則正常去讀取遠(yuǎn)程的Datanode節(jié)點Block blk = targetBlock.getBlock();Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();if (shouldTryShortCircuitRead(targetAddr)) {try {blockReader = getLocalBlockReader(conf, src, blk, accessToken,chosenNode, DFSClient.this.socketTimeout, offsetIntoBlock); // 創(chuàng)建一個用來讀取本地數(shù)據(jù)的BlockReader對象return chosenNode;} catch (AccessControlException ex) {LOG.warn("Short circuit access failed ", ex);//Disable short circuit readsshortCircuitLocalReads = false;} catch (IOException ex) {if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {/* Get a new access token and retry. */refetchToken--;fetchBlockAt(target);continue;} else {LOG.info("Failed to read " + targetBlock.getBlock()+ " on local machine" + StringUtils.stringifyException(ex));LOG.info("Try reading via the datanode on " + targetAddr);}}}// 本地讀取失敗,按照更一般的方式去讀取遠(yuǎn)程的Datanode節(jié)點來獲取數(shù)據(jù)try {s = socketFactory.createSocket();LOG.debug("Connecting to " + targetAddr);NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);s.setSoTimeout(socketTimeout);blockReader = RemoteBlockReader.newBlockReader(s, src, blk.getBlockId(),accessToken,blk.getGenerationStamp(),offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,buffersize, verifyChecksum, clientName); // 創(chuàng)建一個遠(yuǎn)程的BlockReader對象return chosenNode;} catch (IOException ex) {if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {refetchToken--;fetchBlockAt(target);} else {LOG.warn("Failed to connect to " + targetAddr+ ", add to deadNodes and continue" + ex);if (LOG.isDebugEnabled()) {LOG.debug("Connection failure", ex);}// Put chosen node into dead list, continueaddToDeadNodes(chosenNode); // 讀取失敗,會將選擇的Datanode加入到Client的dead node列表,為下次讀取選擇合適的Datanode讀取文件數(shù)據(jù)提供參考元數(shù)據(jù)信息}if (s != null) {try {s.close();} catch (IOException iex) { }}s = null;}}}

上面代碼中,主要包括如下幾個要點:

選擇合適的Datanode節(jié)點,提高讀取效率

在讀取文件的時候,首先會從Namenode獲取文件對應(yīng)的block列表元數(shù)據(jù),返回的block列表是按照Datanode的網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu)進(jìn)行排序過的(本地節(jié)點優(yōu)先,其次是同一機架節(jié)點),而且,Client還維護(hù)了一個dead node列表,只要此時bock對應(yīng)的Datanode列表中節(jié)點不出現(xiàn)在dead node列表中就會被返回,用來作為讀取數(shù)據(jù)的Datanode節(jié)點。

如果Client為集群Datanode節(jié)點,嘗試從本地讀取block

通過調(diào)用chooseDataNode方法返回一個Datanode結(jié)點,通過判斷,如果該節(jié)點地址是本地地址,并且該節(jié)點上對應(yīng)的block元數(shù)據(jù)信息的狀態(tài)不是正在創(chuàng)建的狀態(tài),則滿足從本地讀取數(shù)據(jù)塊的條件,然后會創(chuàng)建一個LocalBlockReader對象,直接從本地讀取。在創(chuàng)建LocalBlockReader對象的過程中,會先從緩存中查找一個本地Datanode相關(guān)的LocalDatanodeInfo對象,該對象定義了與從本地Datanode讀取數(shù)據(jù)的重要信息,以及緩存了待讀取block對應(yīng)的本地路徑信息,可以從LocalDatanodeInfo類定義的屬性來說明:

private ClientDatanodeProtocol proxy = null;private final Map<Block, BlockLocalPathInfo> cache;

如果緩存中存在待讀取的block的相關(guān)信息,可以直接進(jìn)行讀??;否則,會創(chuàng)建一個proxy對象,以及計算待讀取block的路徑信息BlockLocalPathInfo,最后再加入到緩存,為后續(xù)可能的讀取加速。我們看一下如果沒有從緩存中找到LocalDatanodeInfo信息(尤其是BlockLocalPathInfo),則會執(zhí)行如下邏輯:

// make RPC to local datanode to find local pathnames of blockspathinfo = proxy.getBlockLocalPathInfo(blk, token);

上面proxy為ClientDatanodeProtocol類型,Client與Datanode進(jìn)行RPC通信的協(xié)議,RPC調(diào)用getBlockLocalPathInfo獲取block對應(yīng)的本地路徑信息,可以在Datanode類中查看具體實現(xiàn),如下所示:

BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);

Datanode調(diào)用FSDataset(實現(xiàn)接口FSDatasetInterface)的getBlockLocalPathInfo,如下所示:

@Override //FSDatasetInterfacepublic BlockLocalPathInfo getBlockLocalPathInfo(Block block)throws IOException {File datafile = getBlockFile(block); // 獲取本地block在本地Datanode文件系統(tǒng)中的文件路徑File metafile = getMetaFile(datafile, block); // 獲取本地block在本地Datanode文件系統(tǒng)中的元數(shù)據(jù)的文件路徑BlockLocalPathInfo info = new BlockLocalPathInfo(block, datafile.getAbsolutePath(), metafile.getAbsolutePath());return info;}

接著可以直接去讀取該block文件(如果需要檢查校驗和文件,會讀取block的元數(shù)據(jù)文件metafile):

... // BlockReaderLocal類的newBlockReader靜態(tài)方法// get a local file systemFile blkfile = new File(pathinfo.getBlockPath());dataIn = new FileInputStream(blkfile);if (!skipChecksum) { // 如果檢查block的校驗和// get the metadata fileFile metafile = new File(pathinfo.getMetaPath());checksumIn = new FileInputStream(metafile);// read and handle the common header here. For now just a versionBlockMetadataHeader header = BlockMetadataHeader.readHeader(new DataInputStream(checksumIn));short version = header.getVersion();if (version != FSDataset.METADATA_VERSION) {LOG.warn("Wrong version (" + version + ") for metadata file for " + blk + " ignoring ...");}DataChecksum checksum = header.getChecksum();localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, checksum, true, dataIn, checksumIn);} else {localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, dataIn);}

在上面代碼中,返回了BlockLocalPathInfo,但是很可能在這個過程中block被刪除了,在刪除block的時候,Namenode會調(diào)度指派該Datanode刪除該block,恰好在這個時間間隔內(nèi)block對應(yīng)的BlockLocalPathInfo信息已經(jīng)失效(文件已經(jīng)被刪除),所以上面這段代碼再try中會拋出異常,并在catch中捕獲到IO異常,會從緩存中再清除掉失效的block到BlockLocalPathInfo的映射信息。

如果Client非集群Datanode節(jié)點,遠(yuǎn)程讀取block

如果Client不是Datanode本地節(jié)點,則只能跨網(wǎng)絡(luò)節(jié)點遠(yuǎn)程讀取,首先創(chuàng)建Socket連接:

s = socketFactory.createSocket();LOG.debug("Connecting to " + targetAddr);NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);s.setSoTimeout(socketTimeout);

建立Client到目標(biāo)Datanode(targetAddr)的連接,然后同樣也是創(chuàng)建一個遠(yuǎn)程BlockReader對象RemoteBlockReader來輔助讀取block數(shù)據(jù)。創(chuàng)建RemoteBlockReader過程中,首先向目標(biāo)Datanode發(fā)送RPC請求:

// in and out will be closed when sock is closed (by the caller)DataOutputStream out = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));//write the header.out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); // Client與Datanode之間傳輸數(shù)據(jù)的版本號out.write( DataTransferProtocol.OP_READ_BLOCK ); // 傳輸操作類型:讀取blockout.writeLong( blockId ); // block IDout.writeLong( genStamp ); // 時間戳信息out.writeLong( startOffset ); // block起始偏移量out.writeLong( len ); // block長度Text.writeString(out, clientName); // 客戶端標(biāo)識accessToken.write(out);out.flush();

然后獲取到DataInputStream對象來讀取Datanode的響應(yīng)信息:

DataInputStream in = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(sock), bufferSize));

最后,返回一個對象RemoteBlockReader:

return new RemoteBlockReader(file, blockId, in, checksum, verifyChecksum, startOffset, firstChunkOffset, sock);

借助BlockReader來讀取block字節(jié)

我們再回到blockSeekTo方法中,待讀取block所在的Datanode信息、BlockReader信息都已經(jīng)具備,接著就可以從包含輸入流(InputStream)對象的BlockReader中讀取數(shù)據(jù)塊中一個字節(jié)數(shù)據(jù):

int result = readBuffer(buf, off, realLen);

將block數(shù)據(jù)中一個字節(jié)讀取到buf中,如下所示:

private synchronized int readBuffer(byte buf[], int off, int len) throws IOException {IOException ioe;boolean retryCurrentNode = true;while (true) {// retry as many times as seekToNewSource allows.try {return blockReader.read(buf, off, len); // 調(diào)用blockReader的read方法讀取字節(jié)數(shù)據(jù)到buf中} catch ( ChecksumException ce ) {LOG.warn("Found Checksum error for " + currentBlock + " from " + currentNode.getName() + " at " + ce.getPos());reportChecksumFailure(src, currentBlock, currentNode);ioe = ce;retryCurrentNode = false; // 只嘗試讀取當(dāng)前選擇的Datanode一次,失敗的話就會被加入到Client的dead node列表中} catch ( IOException e ) {if (!retryCurrentNode) {LOG.warn("Exception while reading from " + currentBlock + " of " + src + " from " + currentNode + ": " + StringUtils.stringifyException(e));}ioe = e;}boolean sourceFound = false;if (retryCurrentNode) {/* possibly retry the same node so that transient errors don't* result in application level failures (e.g. Datanode could have* closed the connection because the client is idle for too long).*/sourceFound = seekToBlockSource(pos);} else {addToDeadNodes(currentNode); // 加入到Client的dead node列表中sourceFound = seekToNewSource(pos); // 從當(dāng)前選擇的Datanode上讀取數(shù)據(jù)失敗,會再次選擇一個Datanode,這里seekToNewSource方法內(nèi)部調(diào)用了blockSeekTo方法去選擇一個Datanode}if (!sourceFound) {throw ioe;}retryCurrentNode = false;}}

通過BlockReaderLocal或者RemoteBlockReader來讀取block數(shù)據(jù),邏輯非常類似,主要是控制讀取字節(jié)的偏移量,記錄偏移量的狀態(tài)信息,詳細(xì)可以查看它們的源碼。(原創(chuàng)時延軍(包含鏈接:http://shiyanjun.cn))

DataNode節(jié)點處理讀文件Block請求

我們可以在DataNode端看一下,如何處理一個讀取Block的請求。如果Client與DataNode不是同一個節(jié)點,則為遠(yuǎn)程讀取文件Block,首先Client需要發(fā)送一個請求頭信息,代碼如下所示:

//write the header.out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); // Client與Datanode之間傳輸數(shù)據(jù)的版本號out.write( DataTransferProtocol.OP_READ_BLOCK ); // 傳輸操作類型:讀取blockout.writeLong( blockId ); // block IDout.writeLong( genStamp ); // 時間戳信息out.writeLong( startOffset ); // block起始偏移量out.writeLong( len ); // block長度Text.writeString(out, clientName); // 客戶端標(biāo)識accessToken.write(out);out.flush();

DataNode節(jié)點端通過驗證數(shù)據(jù)傳輸版本號(DataTransferProtocol.DATA_TRANSFER_VERSION)一致以后,會判斷傳輸操作類型,如果是讀操作DataTransferProtocol.OP_READ_BLOCK,則會通過Client建立的Socket來創(chuàng)建一個OutputStream對象,然后通過BlockSender向Client發(fā)送Block數(shù)據(jù),代碼如下所示:

try {blockSender = new BlockSender(block, startOffset, length, true, true, false, datanode, clientTraceFmt); // 創(chuàng)建BlockSender對象} catch(IOException e) {out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);throw e;}out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // 回復(fù)一個響應(yīng)Header信息:成功狀態(tài)long read = blockSender.sendBlock(out, baseStream, null); // 發(fā)送請求的Block數(shù)據(jù)覺得文章不錯的話,可以轉(zhuǎn)發(fā)文章關(guān)注一下小編,之后持續(xù)更新干貨文章~~

希望能夠幫助到大家的學(xué)習(xí)。

以上就是關(guān)于pos機架,帶你HDFS讀文件過程分析的知識,后面我們會繼續(xù)為大家整理關(guān)于pos機架的知識,希望能夠幫助到大家!

轉(zhuǎn)發(fā)請帶上網(wǎng)址:http://www.tjfsxbj.com/newstwo/109474.html

你可能會喜歡:

版權(quán)聲明:本文內(nèi)容由互聯(lián)網(wǎng)用戶自發(fā)貢獻(xiàn),該文觀點僅代表作者本人。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如發(fā)現(xiàn)本站有涉嫌抄襲侵權(quán)/違法違規(guī)的內(nèi)容, 請發(fā)送郵件至 babsan@163.com 舉報,一經(jīng)查實,本站將立刻刪除。