HDFS Architecture - Introduction into the Hadoop Distributed File System

This article describes the main features of the Hadoop distributed file system (HDFS) and how the HDFS architecture behave in certain scenarios. HDFS is part of Apache Hadoop.

1. What is HDFS?

HDFS is a system to store huge files on a cluster of servers, whereas the amount of servers is hidden by HDFS. Thus, when you are using an HDFS client to store a file into an HDFS cluster, it almost feels like you are writing the file to a local file system. Behind the scenes the file is split up into equal sized blocks, which then are stored on different machines. The default size of these blocks is 64MB. One obvious advantage of this behaviour is the ability to store files bigger than any disc on a single server.

Splitting files into equal sized blocks is the key tactic in HDFS that makes it easier to archive scalability, performance, availability, data integrity and reliability. To guarantee these quality attributes HDFS comes with a few limitations, which may not be present in a local file system:

HDFS is an integral part of the Hadoop framework and so it is build for Big Data applications. Find more information regarding the history of Hadoop and HDFS in [07] .

2. When to Use and Not Use HDFS?

HDFS is a good fit when it comes to store large amounts of data. Often the data in HDFS is used for analytics batch jobs. You should use HDFS when:

You should NOT use HDFS when you are in one of the following situations:

3. HDFS Use Cases and Clients

Figure 2.1 shows the main HDFS features as an use case diagram. Each use case is described in more detail in chapter 5. There you will also find the communication flows through the system and its components.

HDFS Use Cases Overview
Fig 2.1. - An overview of the main HDFS use cases.

Figure 2.1 shows that there exist a few ways to interact with HDFS. Human actors can use an existing command line client for the interaction, but most of the time HDFS is used by other applications and so an official Java library (API) exists. There is also a C library to access all HDFS function from applications written in C. The Java and C libraries communicate through a RPC protocol. It is also possible to configure HDFS to provide a REST API for all interactions (see WebHDFS). Using the REST API, makes it quite easy to interact with HDFS from any programming language. Another way to communicate with HDFS is to mount HDFS as a filesystem in the userspace (FUSE). This is possible using the Fuse-DFS module.

4. HDFS Components and Responsibilities

The core of HDFS is a composition of two types of components. To archive some specific non-functional goals other components exist, which will be introduced later. The basic two components are subsystems that run as separate processes. In production setups these components even run on different servers (that's why its called distributed file system). To understand how these components work together, remember that each file stored in HDFS is split into blocks, which then are distributed across multiple servers. The detailed communication flows between these components are discussed in the next chapter. For now, we will just introduce the main responsibilities of these two components.

HDFS, namenode and datanode Use Cases/Responsibilities
Fig 4.1. - Responsibilities of the name node and data nodes.

The namenode can be considered as the brain of HDFS. This component knows how the directory structure looks like, how the access rights to each file and directory are configured, which users exists and it also knows where each block of each file is stored. All these information are referred to as "namespace".

The datanodes store the blocks of the files you put in HDFS. The default size of one block is 64 MB per default, but can be configured for each file at the time the file is created. Finally, the blocks end up as a regular file on the local filesystem of one of the data nodes (these files are called "block files"). To handle an increasing number of blocks more than one datanode can be exists in an HDFS setup.

5. Primary Use Cases in Detail

This chapter describes the HDFS communication flows through each use case and how the system behaves during the use case execution, the scenarios that could occur and how the HDFS architecture acts in each of these scenarios.

Goal: Manage a Directory Structure

Managing a directory structure is a fundamental functionality of a filesystem. This includes: creating directories and files, assigning access right to files and directories, deleting files and directories.

The main communication flow looks like this:

  1. Client performs a RPC call on the namenode to initiates the directory creation or other directory structure manipulation.
  2. The namenode checks, whether the directory already exists and whether the client has the rights to change the directory structure.
  3. The namenode creates an entry for the directory. For now, this entry is saved to a file called journal.

Goal: Create Files and Insert Any Amount of Data

In this use case a client application wants to write data to a not yet existing file on HDFS. As a developer of a client application you probably will use the HDFS Java API that makes the file creation on HDFS quite simple. Hence, as an application developer you don't have to deal with the details below. You just have to write a few lines of code.

Anti Goals:

Communication Flow: The most common communication flow through the system shows whats going on behind the scenes when creating a file:

The communication flow when saving a file to HDFS
Fig. Responsibilities when saving a file to HDFS with an replication factor of three
  1. In the first step the client application calls the namenode to initiates the file creation. Remember that, in a later step, HDFS will divide your file content into equal sized blocks, which then are distributed across several datanodes. Although the size of all blocks is the same for one file, each file can have a different size for its blocks. So, in this step, the client tells the namenode the block size to be used for the file (usually 64 MB). Another parameter that can be specified in this step is the desired replication factor for the file content. The default value for this factor is three. This means that three copies of the file content will be placed on the datanodes. Increasing the replication factor of the file content will lead to an increased availability of the data in case one or more data nodes fails. However, it is obvious that increasing the replication factor goes together with higher storage costs.
  2. The namenode checks, whether the file already exists and whether the client has the rights to create the file.
  3. The namenode stores the metadata of the file (filename, path, replication factor, block size, access rights). For now, this entry is saved to a file called journal on the local file system of the namenode. After this step, the file will be visible to other HDFS clients although the file doesn't contain any content yet.
  4. The client application obtains the information from the namenode on which datanode(s) the first block and its replications should be stored. Note, neither the client application nor the namenode nor the datanodes has to know how big the overall file will be. At this point, HDFS just arranges the storage for the first block and its replications. In case the chosen replication factor of the file (see step one) is three, the client receives a list of three datanodes. These datanodes are selected and sorted in a way that the network distance between the datanodes (and the client) is minimized to save transmission costs. The list of datanodes is also referred as "pipeline".
  5. The client application can start the transmission of the first block right after it received the datanodes of the pipeline (see step 4). At the end, each listed datanode will hold a copy of the block. However, in this step the block is streamed only to the first datanode in the pipeline. To transmit the block data from the client to the first datanode the block is split up in even smaller pieces called "packets".
  6. Immediately after the first datanode has received the first packet, it saves the packet to the disc and start to transmit the packet to the second datanode and the second datanode transfers the packet to the third datanode in the pipeline. This is done until the last datanode has received the packet. In our example it ends when the third datanode has received the packet.
  7. When the last datanode has received and saved the packet, the node acknowledges than the package transmission went fine. This acknowledgment is send to the predecessor datanode who sent the package. As soon the predecessor datanode receives the acknowledgment, it sends an acknowledgment to its predecessor that all its successor nodes has received the packet. The last datanode, which receives the acknowledgment is the first node in the pipeline that actually received the packet from the client application. Thus this node sends an acknowledgment to the client application and with this the client knows that all datanodes in the pipeline received a copy of the block.
  8. Note that step 5, 6 and 7 are executed in parallel. The client starts to send packets even the previous packet transmission hasn't been acknowledged yet. When the client has send enough packets to fill up the first block, he asks the namenode for a new list of datanodes where to store the next block of the file. Getting a new list of datanodes the client application repeats step 5, 6 and 7.
  9. If the client has sent all its data to the datanodes he closes the file. This operation ensures that the full content of the file is visible to client, which reads the file data.

Goal: Attach Content to an Existing File

Anti Goals:

Writing data to an existing file is almost the same process as writing data to a not yet existing file (see "Create files and insert any amount of data"). There are two differences:

  • In the steps one, two and three the namenode won't create the file since it always exists. The namenode just checks whether the client is authorized to perform write operations on the existing file.
  • Since the file already exists and there is also some data stored in this file, the datanodes already hold some blocks associated with this file. It is very likely that the last saved block of the file isn't fully filled with data. In this case the client application obtains a list of the datanodes storing this last block. The client application then writes packets to the already existing block until it is filled up to the block size of the file. Only when the existing block is filled up the namenode arranges a new list of datanodes to store the following data.

Goal: Effective Access to an Huge Amount of File Content

Accessing a huge amount of data is one of the major use cases of HDFS. Using the Java HDFS library, the complexity of accessing file content is hidden from the application developer. Thanks to the distribution of the file blocks across different nodes, HDFS can handle a bigger amount of parallel file reads. Due to the block replication there are multiple nodes able to server the content of the same block. Hence, even parallel accesses to the same file causes no performance problem.

Another important concept when accessing data on HDFS is the awareness where the data is located. The idea behind this, is that HDFS minimizes the way data has to be transferred. This is described in more detail in the communication flow for this use case (see blow). Using the HDFS API, it is even possible to find out where the file data reside and moving the client application next to the data. This means: executing the client application logic on a datanode and don't transfer the file block content over the network. The concept of moving the computation next to the data is described in the chapter "data locality".

Anti Goals:

  • It is not guaranteed that written data is visible to any client applications right after it was written by the writing client. There is a sync method to ensure the written data will be available to clients.
  • HDFS is optimized for a hight throughput data access. The trade-off for this optimization is a lower latency when accessing data. In contrast to other database solution (e.g. PostgreSQL) it takes much longer until the first peace of data arrives at the client application. This is because the client application first has to contact the namenode before the client actually requests data from a datanode.

Communication Flow: The figure below shows communication between the HDFS components when reading the content of a file.

The communication flow when reading file content on HDFS
Fig. Responsibilities when reading file content from HDFS
  1. The client asks the namenode where it can find copies of the first few blocks of a file. Note that only the locations of a few blocks are transferred. To get the locations of the following blocks, the client has to ask the namenode again. The namenode will return a list of datanodes for each block. Each of this namenodes holds a copy of the block and the list of datanodes is sorted by their network distance from the client.
  2. The client application will then choose the first datanode from the list of datanodes where the first block is stored. The client chooses the first datanode because this datanode is close to the client. The client application is close to the datanode means that the client and the datanode process run in the same data center, rack or even on the same machine. In case the first datanode is not available the client can pick the second datanode from the list. The data is then transmitted from the datanode to the client. The client application initiates the block transmission from the datanode to the client. In the scenario that the client application is executed on the same machine then a datanode, it is possible that the client reads the block content directly from the local filesystem without transferring the block data through the datanode process. This tactic is called Short-Circuit Local Reads.

6. Tactics Used to Archive Qualities

Qualities Quality Scenarios
(affected use cases are linked)
Tactics
Availability The namenode fails during a file write operation Hot Standby
The namenode fails just like that Hot Standby
A datanode fails during a file read operation
A datanode fails during a file write operation
A datanode fails just like that Block File Replication, File Partitioning, Heartbeats
A network failure causes a network partition (Split Brain)
A Datacenter fails
Scalability File create frequency increases File Partitioning, Namenode Federations
Amount of created files increases over time Namenode Federations, Journaling, File Partitioning, Multiple Datanodes
A file of any size is being created File Partitioning, Multiple Datanodes
A file smaller than its block size is being created Not Optimized
The amount of created files smaller than its block size increases over time Not Optimized
A file of any size is being read Namenode Federations, File Partitioning, Multiple Datanodes, Data Locality
Coherency A client reads a file while another clients writes data to this file Sync Method
Consistency A client tries to write data into a file while another client writes data into the same file Write Leases
Integrity The content of a file has been corrupted when writing a file (during the transmission to a datanode) Checksums
A data block is getting corrupted due to a bit rot on a datanode Checksums, Periodical Checksum Scanning
Performance A client writes a file Client Site Compression, Datanode Distance Minimization
A client reads a file Seek Time Optimization, Short-Circuit Local Reads, Data Locality
Reliability The network connection has been interrupted while a file is written or read Packet Streaming
Latency A client reads a file Not Optimized

Journaling and Checkpointing

The namenode is responsible for saving the metadata of the HDFS cluster. This includes the directory tree, the filenames, the access rights and the IDs of the blocks for each file. As already mentioned this data is called namespace. The namenode holds the whole namespace information in it's RAM. This makes the directory structure management a performant task. The performance is important at this point since the namenode can't be scaled as arbitrary as a stateless application ( see "Namenode Federations" for information how to scale the namenode ).

Nevertheless, the namespace has to be made persistent since the whole HDFS cluster is useless when these information are lost. For this, each action that modifies the namespace is written to a file called journal (sometimes also called "edit log"). Writing the actions to the journal doesn't require to change existing data on the disc. Instead, it's done by just adding data to the end of the journal file. Adding data to the end of a file is a much faster operation than searching for existing data and manipulating the data in the middle of a file. For durability reasons, the namenode confirms a client request (file creation, directory creation, ...) only when the operation was successfully recored to the journal.

The namespace can be reconstructed from the actions recored to the journal file. Since the namespace is stored in the RAM of the namenode, the namespace has to be reconstructed form the journal every time the namenode restarts. Depending on the amount of actions - recored in the journal file - this is a time consuming process. Another problem is that the journal file grows fast and it could eat up the disc space on the namenode. To solve these problems a checkpoint is performed every time the namenode starts. This means, the current namespace will be reconstructed from the journal, the result is saved to an other file called fsimage and the journal file will be cleared. The next time a checkpoint should be performed, it is done using the old fsimage file and the new recored actions from the jounral file.

Hot Standby Failover for the Namenode

As the namenode is the only component, which knows which blocks belongs to which file in which order, it is obvious that the HDFS cluster can't be used without the namenode. So, in case the namenode fails, it is not possible anymore to manage the directory tree, to write data into the cluster or to read any data from the cluster.

To increase the availability of a HDFS cluster, a second namenode can be configured as a failover for the first one. To prevent a so called split brain situation between the two namenodes, only one namenode is responsible for accepting requests from client applications. The second namenode just gets informed about each action and so this node is referred to as passive node. This behaviour prevent that the data diverges between the two namenodes.

HDFS provides two different setups to replicate the changes from the active to the passive namenode:

Using a Network File System (NFS) to achieve high availability

The active namenode logs each client operations into the journal as usual. In this setup the journal file is saved into a shared directory. Using NFS the passive namenode has also access to this directory over the network. More information about the NFS setup to achieve high availability can be found on the official Hadoop documentation.

Using Quorum Journal Manager to achieve high availability

Using this tactic the active namenode additionally writes each client operation to a cluster of nodes called journalnodes. The passive namenode observes the journalnode cluster constantly the current state of the namenode. The official Hadoop documentation also provides more information about the availability through journalnodes.

Namenode Federations

To scale the namenode horizontally it is not possible to simply add additional namenode instances and randomly distributing the client requests. This is because the namenode holds its data in RAM and distributing write requests to different namenode instances would cause a split brain problematic and so the data between the namenode instances would diverge. Further, read operations could return inconsistent values or - to prevent this - write operation would be expensive to ensure consistency.

To achieve consistent read operation and effective write operations the HDFS architecture provides a mechanism called namenode federations for horizontal scalability. Using this tactic each namenode is responsible to manage a subtree of the whole directory structure. For example: Namenode instance #1 is responsible to manage all requests, which affects files and directories blow the path "customers/company-name". The distribution of the requests to the right namenode is done by the client applications itself. For this, each client obtains a mount table to determine which namenode is responsible for a file or directory.

Another advantage of namenode federations is that they can be used to isolate namenodes against load. For example: When different subdirectories are used to store data of different customers, the namespace data (metadata) can be partitioned to different namenodes. In case one customer executes many operations on the namenode and consumes all computation power of the namenode, all other customers are not affected.

A disadvantage of this tactic is that it should be ensured that the majority of the files are not saved in the same directory. In this case the namenode responsible for this directory becomes a bottleneck.

Write Leases

Whenever a client application tries to write data into an existing file on HDFS the namenode checks whether an other client currently writes data to the same file. If this is the case the client gets rejected (reading the file content is still possible). The rejected client could then try to write the data at another time or to anther file. This behaviour is the cheapest way to prevent data mismatch and ensure integrity.

In case no other client writes to the file, the namenode will grant the write access to the client application. With this grant this client obtains a lease, which expires within a short time period (usually one minute). From now on, the write requests of other clients will be rejected as long as the lease isn't expire or the writing client application closes the file. The writing client application can refresh the lease as long it is not expired. The refresh will cause a reset of the lease expiration time.

Synchronize Written Data (sync method provided by client libraries)

HDFS doesn't has a strong coherency model. This means that data written to the current block is not guaranteed to be visual to other clients, which read the content of the file. Only when the writing client concluded its write operation and closed the file, the new content is visible for sure. To ensure that the current written content gets visible to other clients without closing the file, it is possible that the client invokes the sync method provided by the common HDFS libraries.

Distribution of File Data Across Multiple Datanodes

TBD

Block File Replication

TBD

Heartbeats

Checksums

TBD

Data Locality

TBD

Periodical Checksum Scanning

TBD

Client Site Compression

TBD

Short-Circuit Local Reads

TBD

Seek Time Optimization

TBD

Packet Streaming

TBD

8. External Sources