DataOps using distributed system aka Hadoop for storing and processing data in a centralized and distributed manner

As we know the Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. Hadoop runs on commodity hardware, and it’s completely written in Java. Hadoop is robust, self-healing, and resilient. Hadoop is a very powerful Big Data analysis platform as of now. Hadoop’s core components are HDFS and Map Reduce. HDFS is for storage and Map Reduce is for batch processing. But for real-time processing data, we have tools like Spark that is known for parallel processing. Hadoop also has its own inbuilt backup and disaster recovery and its replication factor of 3 makes it very resilient. HDFS has daemons like Namenode, Secondary Namenode, and Datanode. Namenode manages the filesystem namespaces. Secondary Namenode is responsible for checkpointing namespaces to make persistent fsimage files. Data is stored in a block in HDFS under Datanode. Each block size is 256MB. So, assuming we have a file of 1GB, then it’ll be written in 4 blocks. 

 

Anatomy of File Writing to HDFS:

When we write this 1 GB file to HDFS. The requests go to namenode first. Namenode will create metadata of the file and will give block locations to the user. The data will be written in blocks in the form of pipelines. For e.g., the first copy of block 1 will be written on datanode and the second copy of block 1 will be written on datanode and then the third copy of block 1 will be written on datanode. As the third copy is written, the acknowledgment received from HDFS will go to the user and then the system will create another pipeline to write block 2. The process will be the same as was for block 1.  Fsdataoutput stream class is responsible for writing data to HDFS. At the time of writing if there’s a problem with 1 datanode and the process is not going forward, then it’ll break the pipeline and send info to the Namenode and Namenode will give a new datanode location. Then it’ll go with that new datanode and continue the pipeline. 

 Anatomy of File Read from HDFS:

When we want to read data from  HDFS, the requests go to namenode first. Namenode will check the file permission for that particular user. If permission is ok for the user then it will send block locations to the user. And data is fetched parallely across the cluster. Fsdatainput stream class is responsible for reading data from HDFS.

Creating Data Pipelines :

For processing data we’ve to make data available on HDFS. To get data available on HDFS, we have to create data pipelines to fetch data from source and write it to destinations. A data pipeline is a set of processes that enable the efficient flow of data from any sources to any destinantions. In this era, we have lots of ways to create data pipelines. But we will discuss on most used ways to create data pipelines. There are 3 ways which are mostly used in production.

  1. Creating Data Pipeline using Distcp.

  2. Creating Data Pipeline using Kafka.

  3. Creating Data Pipeline from S3 to HDFS and vice versa using command line.

Let’s take a deep dive into each one.

Creating Data Pipeline using DISTCP

 We can create data pipelines using DISTCP.  DISTCP uses map tasks in background which helps data to be transferred in parallel form to achieve parallelisms. There are no reducers. Each file is copied by a single map, and DISTCP tries to give each map approximately the same amount of data by bucketing files into roughly equal allocations. By default, up to 20 map tasks are used, but this can be changed by specifying the -m argument to DISTCP. One use for DISTCP is as an efficient replacement for hadoop fs -cp. For example, you can copy one file to another with

$ hadoop distcp file1 file2

You can also copy directories:

$ hadoop distcp dir1 dir2

If dir2 does not exist, it will be created because as it uses map-reduce and map-reduce likes to create output directory on its own, and the contents of the dir1 directory will be copied there. If dir2 already exists, then dir1 will be copied under it, creating the directory structure dir2/dir1. If this isn’t what you want, you can supply the -overwrite option to keep the same directory structure and force files to be overwritten. You can also update only the files that have changed using the -update option. This is best shown with an example. If we changed a file in the dir1 subtree, we could synchronize the change with dir2 by running:

$ hadoop distcp -update dir1 dir2.

 Even for a single file copy, the DISTCP variant is preferred for large files since hadoop fs -cp copies the file via the client running the command.

A very common use case for DISTCP is for transferring data between two HDFS clusters. For example, the following creates a backup of the first cluster’s /foo directory on the second:

$ hadoop distcp -update -delete -p hdfs://namenode1/foo hdfs://namenode2/foo

 The -delete flag causes DISTCP to delete any files or directories from the destination that are not present in the source, and -p means that file status attributes like permissions, block size, and replication are preserved. You can run DISTCP with no arguments to see precise usage instructions.

 If the two clusters are running incompatible versions of HDFS, then you can use the webhdfs protocol to distcp between them:

$ hadoop distcp webhdfs://namenode1:50070/foo webhdfs://namenode2:50070/foo

To fire these commands we have to configure the cluster to make this types of pipelines. We have to configure these properties in two files. Core-site.xml & mapred-site.xml.

nano core-site.xml


<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3.S3FileSystem</value>
<description>The FileSystem for s3a: S3 uris.</description>
</property>

 

After entering this property, save.

nano mapred-site.xml
 
<property>
  <!-- Add to the classpath used when running an M/R job -->
  <name>mapreduce.application.classpath</name>
 <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*,$HADOOP_MAPRED_HOME/share/hadoop/tools/lib/*</value>
</property>

 

After entering this property, save.

 

We use the following commands for DISTCP.

                                                                          

$ hadoop distcp -update -delete -p hdfs://namenode1/foo hdfs://namenode2/foo
$ hadoop distcp /user/hduser/jinga.xml s3a://<bucket-name>/ 
$ hadoop distcp hdfs:///user/hduser/jinga.xml s3a://<bucket-name>/
$ hadoop distcp s3a://<bucket-name>/ hdfs:///user/hduser/
$ hadoop distcp webhdfs://namenode1:50070/foo webhdfs://namenode2:50070/foo

 

Creating Data Pipeline using Kafka 

 We can create data pipelines using kafka which is the most robust, resilient, distributed, commit-log based publish subscribe messaging systems which was originally developed by LinkedIn. In Kafka, we have source and destination and in between source and destination, we have Kafka cluster. From source to Kafka cluster, we fetch data through producer API. And from Kafka cluster to destination we put data through consumer API. We can have multiple producers and multiple consumers. 

Core Concepts of Kafka:

In Kafka, we create topics, under topics we can create multiple partitions. In partitions we have collection of messages(data). Data is always written to topics not to the partitions. We write data to kafka cluster containing at least 3 brokers. Each broker contains certain topics and partitions. These brokers are bootstrapped meaning that if you are connected to one broker than you are connected to entire kafka cluster. Good hardware configuration (for eg: 32gb Ram & 4 Core) will maintain thousand of partitions and millions of messages per second.

Architecture Of Kafka:

When data is available on HDFS, then the data science team can process the data as per production requirements. 

 
 

We configure the properties server.properties to make changes to Kafka. 

Working with Kafka

We fire command to create /list the topic, or create a producer/consumer pipeline. For creating topics, and starting consumer, producer pipeline, we have some commands as follows.

 ## Create a topic

kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --create --topic my-topic --replication-factor 3 --partitions 3  

##Create a producer

kafka-console-producer.sh --broker-list kf1:9092,kf2:9092,kf3:9092 --topic my-topic
(publish some data)

## Create a consumer

kafka-console-consumer.sh --bootstrap-server kf1:9092,kf2:9092,kf3:9092 --topic my-topic --from-beginning

## Create another topic

kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --create --topic my-topic2 --replication-factor 3 --partitions 3 

## List topics

kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka –list


Creating a Data Pipeline from S3 to HDFS and vice versa

 If we are using a cloud service like AWS, and data is stored in S3. Then we have to create data pipelines to fetch data from S3 to write it down to HDFS. For creating this type of pipeline, first, we have to configure core-site.xml under HDFS for configuring S3 access to HDFS. After configuring the cluster, we can create data pipelines from S3.

To configure core-site.xml properly with proper properties, let’s have a look at the property name as follows.

nano core-site.xml
 
<property>
  <name>fs.s3a.access.key</name>
  <description>AWS access key ID used by S3A file system.</description>
  <value>AKIAJ5IOWRHVI24JPJPQ</value> 
</property>
 
<property>
  <name>fs.s3a.secret.key</name>
  <description>AWS secret key used by S3A file system.</description>
  <value>OjC8QsKDYWE6HD5WNOykgwaSplpNzJUCB57R2ZZM</value>
</property>
 

After entering these two properties, save.

nano hadoop-env.sh
 export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HADOOP_HOME/share/hadoop/tools/lib/*

Commands to create data pipelines from S3 to HDFS or vice-versa.         

 hdfs fs -cp s3a://<bucket-name>/<file-name> hdfs:///user/hduser/
 hdfs dfs -cp s3a://<bucket-name>/<file-name> /user/hduser/
                                                                                                                          

We can use the distcp command also to create a data pipeline from S3 and vice versa

 

hadoop distcp /user/hduser/jinga.xml s3a://<bucket-name>/
 hadoop distcp hdfs:///user/hduser/jinga.xml s3a://<bucket-name>/
 hadoop distcp s3a://<bucket-name>/ hdfs:///user/hduser/
 
Mohammed Ismail