We hope you are doing well.


Assume that the column headers are alphabets and column values are numbers.


One of the ways to achieve this, is to use DistributedCache. Following are the steps:


Create a file containing the column headers.

In the Driver code, add this file to the distributed cache, by calling Job::addCacheFile()

In the setup() method of the mapper, access this file from the distributed cache. Parse and store the contents of the file in a columnHeader list.

In the map() method, check if the values in each record match the headers (stored in columnnHeader list). If yes, then ignore that record (Because the record just contains the headers). If no, then emit the values along with the column headers.

This is how the Mapper and Driver code looks like:


Driver:


public static void main(String[] args) throws Exception {


    Configuration conf = new Configuration();


    Job job = Job.getInstance(conf, "HeaderParser");

    job.setJarByClass(WordCount.class);

    job.setMapperClass(HeaderParserMapper.class);


    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(NullWritable.class);


    job.addCacheFile(new URI("/in/header.txt#header.txt"));

    FileInputFormat.addInputPath(job, new Path("/in/in7.txt"));

    FileOutputFormat.setOutputPath(job, new Path("/out/"));


    System.exit(job.waitForCompletion(true) ? 0:1);

}


Driver Logic:


Copy "header.txt" (which contains just one line: A,B,C,D) to HDFS

In the Driver, add "header.txt" to distributed cache, by executing following statement:

job.addCacheFile(new URI("/in/header.txt#header.txt"));

Mapper:


public static class HeaderParserMapper

        extends Mapper<LongWritable, Text , Text, NullWritable>{


    String[] headerList;

    String header;


    @Override

    protected void setup(Mapper.Context context) throws IOException, InterruptedException {

        BufferedReader bufferedReader = new BufferedReader(new FileReader("header.txt"));

        header = bufferedReader.readLine();

        headerList = header.split(",");

    }


    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {


        String line = value.toString();

        String[] values = line.split(",");


        if(headerList.length == values.length && !header.equals(line)) {

            for(int i = 0; i < values.length; i++)

                context.write(new Text(headerList[i] + "," + values[i]), NullWritable.get());

        }

    }

}


Mapper Logic:


Override setup() method.

Read "header.txt" (which was put in distributed cache in the Driver) in the setup() method.

In the map() method, check if the line matches the header. If yes, then ignore that line. Else, output header and values as (h1,v1), (h2,v2), (h3,v3) and (h4,v4).

I ran this program on the following input:


A,B,C,D

1,2,3,4

5,6,7,8

I got the following output (where values are matched with respective header):


A,1

A,5

B,2

B,6

C,3

C,7

D,4

D,8