Map Reduce program to filter records | MapReduce | Java

How use where condition using MapReduce program

Parmanand
2 min readSep 10, 2022

As as Big Data developer, we all know that how much MapReduce is important for us. But most of us ignore this concept just because this is complex or not necessary. But this is very important for interview purpose.

In this article, I’ll try to write a program which will filter records based on some condition.

Let’s consider a file which contains few lines.

1,parmamnd,10-02-1994
2,Rahul,10-02-1994
3,Shyam,10-02-1994

Now will create MapReduce program to filter records

Step 1: Create a map1 class and extends Mapper class

class Map extends Mapper<LongWritable, Text, Text, Text> {

//Map Method
@Override
public void map(LongWritable Text, Text value, Context context) throws IOException, InterruptedException {

String line[] = value.toString().split(",");

//if condition to filter records
if(Integer.parseInt(line[0])>1)
{
// writing records
context.write(new Text(value), new Text(""));
}
}
}

Map method takes key-value as input and produces key-value as output.

Mapper<LongWritable, Text, Text, Text>

Here, first two data types are input key and value to the map function, will be of long values and string characters respectively.

The second two data types are intermediate output key and value from the map function will be string characters and string numbers respectively.

Note : No reducer is required here in this program.

Output:

In the above output file you can notice that it only contains records where id > 1

Complete Java program!

package com.target.mapreduce.filter;

import org.apache.commons.io.output.NullWriter;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.*;
import java.io.IOException;
import java.util.Date;

class Map extends Mapper<LongWritable, Text, Text, Text> {
//Map Method
@Override
public void map(LongWritable Text, Text value, Context context) throws IOException, InterruptedException {

String line[] = value.toString().split(",");
if(Integer.parseInt(line[0])>1)
{
System.out.println("mapper input : "+Text+" "+value + context.getTaskAttemptID());
context.write(new Text(value), new Text(""));
}

}
}


public class Filter extends Configured implements Tool
{
@Override
public int run(String[] strings) throws Exception {
Job job=Job.getInstance(getConf());
job.setJobName("filter");
job.setJarByClass(com.target.mapreduce.count.Count.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(com.target.mapreduce.filter.Map.class);
job.setNumReduceTasks(0);

FileInputFormat.addInputPath(job, new Path("/Users/pamkin/IdeaProjects/WordCount/src/main/resources/input/filter.txt"));
FileOutputFormat.setOutputPath(job, new Path("/Users/pamkin/IdeaProjects/WordCount/src/main/resources/output"+new Date().getTime()));
return job.waitForCompletion(true)?1:0;

}
public static void main(String[] args) throws Exception {
ToolRunner.run(new com.target.mapreduce.filter.Filter(),args);
}
}

Thanks for reading!

Please do share the article, if you liked it. Any comments or suggestions are welcome.

--

--

No responses yet