big data 6
PRACTICAL NO – 6
Aim:
Implementing Bloom Filter using Map-Reduce.
1.
import java.io.DataOutputStream;
2.
import java.io.FileOutputStream;
3.
import java.io.IOException;
4.
import org.apache.hadoop.util.bloom.BloomFilter;
5.
import org.apache.hadoop.util.bloom.Key;
6.
import org.apache.hadoop.util.hash.Hash; 7.
8.
public class DepartmentBloomFilterTrainer {
9.
public static int
getBloomFilterOptimalSize(int
numElements, float falsePosRate) {
10.
return
(int) (-numElements * (float) Math.log(falsePosRate) / Math.pow(Math.log(2), 2));
11. }
12.
public static int
getOptimalK(float
numElements, float vectorSize) {
13.
return (int)
Math.round(vectorSize * Math.log(2) / numElements);
14. }
15.
public static void main(String[] args) throws IOException {
16.
args = new String[]
{ "32658", "0.2","Replace this string with Input file location",
29
17.
"Replace this string with
output path location where the bloom filter hot list data will be stored","" };
18.
int numMembers = Integer.parseInt(args[0]);
19.
float falsePosRate = Float.parseFloat(args[1]);
20.
int vectorSize = getBloomFilterOptimalSize(numMembers, falsePosRate);
21.
int nbHash = getOptimalK(numMembers, vectorSize);
22.
BloomFilter filter
= new BloomFilter(vectorSize,
nbHash, Hash.MURMUR_HASH);
23.
ConfigFile configFile = new ConfigFile(args[2], FileType.script, FilePath.absolutePath);
24.
String fileContent = configFile.getFileContent();
25.
String[] fileLine =
fileContent.split("\n");
26.
for (String lineData
: fileLine) {
27.
String lineDataSplit[] =
lineData.split(",", -1);
28.
String departmentName = lineDataSplit[3];
29.
filter.add(new Key(departmentName.getBytes()));
30. }
31.
DataOutputStream dataOut = new DataOutputStream(new FileOutputStream(args[3]));
32. filter.write(dataOut);
33.
dataOut.flush();
34.
dataOut.close();
35. }
36. }
In the setup method the bloom fileter file is deserialized and
loaded into the bloom filter.In the map method, the departmentName is extracted
from each input record and tested against the Bloom filter. If the word is a
member, the entire record is output to the file system.Ideally to load the
bloom filter hot words we should be using DistributedCache a hadoop utility
that ensures that a file in HDFS is present on the local file system of each
task that requires that file for simplicity purpose i am loading it from my
local file system. As we have trained the bloom filter with PUBLIC LIBRARY
department the output of the map reduce program will have only employee data
relevant to PUBLIC LIBRARY department.
1.
import java.io.DataInputStream;
2.
import java.io.FileInputStream;
3.
import java.io.IOException;
4.
import org.apache.hadoop.io.NullWritable;
5.
import org.apache.hadoop.io.Text;
6.
import org.apache.hadoop.mapreduce.Mapper;
7.
import org.apache.hadoop.util.bloom.BloomFilter;
8.
import org.apache.hadoop.util.bloom.Key; 9.
10.
public class
DepartmentBloomFilterMapper extends Mapper<Object, Text, Text,
NullWritable> {
11.
private BloomFilter filter = new BloomFilter();
12.
protected void setup(Context context) throws IOException,
InterruptedException {
13.
DataInputStream dataInputStream = new DataInputStream(
14.
new FileInputStream(context.getConfiguration().get("bloom_filter_file_location")));
15.
filter.readFields(dataInputStream);
16.
dataInputStream.close();
17. }
18.
public void map(Object
key, Text value, Context context) throws
IOException,
InterruptedException {
19.
String data = value.toString();
20.
String[] field =
data.split(",", -1);
21.
String department =
null;
22.
if (null != field && field.length == 9 && field[3].length() > 0) {
23.
department = field[3];
24.
if (filter.membershipTest(new Key(department.getBytes()))) {
25.
context.write(value, NullWritable.get());
26. }
27. }
28. }
29. }
Finally we will use the driver class to test everything is working
fine as expected . The output will contain only the data of employees who
belongs to the department PUBLIC LIBRARY which can be used for further
analysis.
1.
args = new String[]
{ "Replace this string with Input Path location",
2.
import java.io.File;
3.
import org.apache.commons.io.FileUtils;
4.
import org.apache.hadoop.conf.Configuration;
5.
import org.apache.hadoop.fs.Path;
6.
import org.apache.hadoop.io.NullWritable;
7.
import org.apache.hadoop.io.Text;
8.
import org.apache.hadoop.mapreduce.Job;
9.
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10.
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11.
import com.hadoop.design.summarization.blog.ConfigurationFactory;
12.
11.
13.
public class BloomFilterDriver {
14.
public static void main(String[] args) throws Exception {
15.
"Replace this string with
output Path location","Replace this string with Input file location of bloom filter hot
list" };
16.
FileUtils.deleteDirectory(new File(args[1]));
17.
System.setProperty("hadoop.home.dir",
"Replace this string with hadoop home directory
location");
18.
if (args.length !=
3) {
19.
System.err.println("Please specify the input
and output path");
20.
System.exit(-1);
21. }
22.
Configuration conf = ConfigurationFactory.getInstance();
23.
conf.set("bloom_filter_file_location",args[2]);
24.
Job job = Job.getInstance(conf);
25.
job.setJarByClass(BloomFilterDriver.class);
26.
job.setJobName("Bloom_Filter_Department");
27.
FileInputFormat.addInputPath(job, new Path(args[0]));
28.
FileOutputFormat.setOutputPath(job, new Path(args[1]));
29.
job.setMapperClass(DepartmentBloomFilterMapper.class);
30.
job.setOutputKeyClass(Text.class);
31.
job.setOutputValueClass(NullWritable.class);
32. System.exit(job.waitForCompletion(true) ? 0 : 1);
}}
Comments
Post a Comment