big data 6

                                                        PRACTICAL NO – 6

Aim: Implementing Bloom Filter using Map-Reduce.

1.                 import;

2.                 import;

3.                 import;

4.                 import org.apache.hadoop.util.bloom.BloomFilter;

5.                 import org.apache.hadoop.util.bloom.Key;

6.                 import org.apache.hadoop.util.hash.Hash; 7.

8.                 public class DepartmentBloomFilterTrainer {

9.                 public static int getBloomFilterOptimalSize(int numElements, float falsePosRate) {

10.        return (int) (-numElements * (float) Math.log(falsePosRate) / Math.pow(Math.log(2), 2));

11. }

12.        public static int getOptimalK(float numElements, float vectorSize) {

13.        return (int) Math.round(vectorSize * Math.log(2) / numElements);

14. }

15.        public static void main(String[] args) throws IOException {

16.        args = new String[] { "32658", "0.2","Replace this string with Input file location",





17.        "Replace this string with output path location where the bloom filter hot list data will be stored","" };

18.        int numMembers = Integer.parseInt(args[0]);

19.        float falsePosRate = Float.parseFloat(args[1]);

20.        int vectorSize = getBloomFilterOptimalSize(numMembers, falsePosRate);

21.        int nbHash = getOptimalK(numMembers, vectorSize);

22.        BloomFilter filter = new BloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH);

23.        ConfigFile configFile = new ConfigFile(args[2], FileType.script, FilePath.absolutePath);

24.        String fileContent = configFile.getFileContent();

25.        String[] fileLine = fileContent.split("\n");

26.        for (String lineData : fileLine) {

27.        String lineDataSplit[] = lineData.split(",", -1);

28.        String departmentName = lineDataSplit[3];

29.        filter.add(new Key(departmentName.getBytes()));

30. }

31.       DataOutputStream dataOut = new DataOutputStream(new FileOutputStream(args[3]));

32.       filter.write(dataOut);

33.       dataOut.flush();

34.       dataOut.close();

35. }

36. }


In the setup method the bloom fileter file is deserialized and loaded into the bloom filter.In the map method, the departmentName is extracted from each input record and tested against the Bloom filter. If the word is a member, the entire record is output to the file system.Ideally to load the bloom filter hot words we should be using DistributedCache a hadoop utility that ensures that a file in HDFS is present on the local file system of each task that requires that file for simplicity purpose i am loading it from my local file system. As we have trained the bloom filter with PUBLIC LIBRARY department the output of the map reduce program will have only employee data relevant to PUBLIC LIBRARY department.

1.                 import;

2.                 import;

3.                 import;

4.                 import;

5.                 import;

6.                 import org.apache.hadoop.mapreduce.Mapper;

7.                 import org.apache.hadoop.util.bloom.BloomFilter;

8.                 import org.apache.hadoop.util.bloom.Key; 9.

10.        public class DepartmentBloomFilterMapper extends Mapper<Object, Text, Text, NullWritable> {

11.        private BloomFilter filter = new BloomFilter();

12.        protected void setup(Context context) throws IOException, InterruptedException {

13.        DataInputStream dataInputStream = new DataInputStream(

14.        new FileInputStream(context.getConfiguration().get("bloom_filter_file_location")));

15.        filter.readFields(dataInputStream);

16.        dataInputStream.close();

17. }

18.        public void map(Object key, Text value, Context context) throws

IOException, InterruptedException {

19.        String data = value.toString();

20.        String[] field = data.split(",", -1);

21.        String department = null;

22.        if (null != field && field.length == 9 && field[3].length() > 0) {

23.        department = field[3];

24.        if (filter.membershipTest(new Key(department.getBytes()))) {

25.        context.write(value, NullWritable.get());

26. }

27. }

28. }

29. }

Finally we will use the driver class to test everything is working fine as expected . The output will contain only the data of employees who belongs to the department PUBLIC LIBRARY which can be used for further analysis.


1.                 args = new String[] { "Replace this string with Input Path location",

2.                 import;

3.                 import;

4.                 import org.apache.hadoop.conf.Configuration;

5.                 import org.apache.hadoop.fs.Path;

6.                 import;

7.                 import;

8.                 import org.apache.hadoop.mapreduce.Job;

9.                 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

10.        import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

11.        import;

12.        11.

13.        public class BloomFilterDriver {

14.        public static void main(String[] args) throws Exception {

15.        "Replace this string with output Path location","Replace this string with Input file location of bloom filter hot list" };

16.        FileUtils.deleteDirectory(new File(args[1]));

17.        System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

18.        if (args.length != 3) {

19.        System.err.println("Please specify the input and output path");

20.        System.exit(-1);

21. }

22.        Configuration conf = ConfigurationFactory.getInstance();

23.        conf.set("bloom_filter_file_location",args[2]);

24.        Job job = Job.getInstance(conf);

25.        job.setJarByClass(BloomFilterDriver.class);

26.        job.setJobName("Bloom_Filter_Department");

27.        FileInputFormat.addInputPath(job, new Path(args[0]));

28.        FileOutputFormat.setOutputPath(job, new Path(args[1]));

29.        job.setMapperClass(DepartmentBloomFilterMapper.class);

30.        job.setOutputKeyClass(Text.class);

31.        job.setOutputValueClass(NullWritable.class);

32.        System.exit(job.waitForCompletion(true) ? 0 : 1); 



Popular posts from this blog

big data 8

big data 5

ML programs