big data 7

                                                     PRACTICAL NO – 7

Aim: Implementing Frequent Item Set Algorithm Using Map-Reduce.

import java.io.BufferedReader; import java.io.*;

import java.io.IOException; import java.net.*;

import java.util.ArrayList; import java.util.*;


                     importmodel.HashTreeNode;     

                    import model.ItemSet;

import model.Transaction;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner; import utils.AprioriUtils;

import utils.HashTreeUtils; import org.apache.hadoop.fs.*;

/*

* A parallel hadoop-based Apriori algorithm */

public class MRApriori extends Configured implements Tool {

private static String jobPrefix = "MRApriori Algorithm Phase ";

// TODO : This is bad as I using a global shared variable between functions which should

// ideally be a function parameter. Need to fix this later. These parameters are required in

// reducer logic and have to be dynamica. How can I pass some initialisation parameters to

// reducer ?

 

public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

if(args.length != 5) {

System.err.println("Incorrect number of command line args.


Exiting !!");

 

}


return -1;


 

String hdfsInputDir = args[0];

String hdfsOutputDirPrefix = args[1];

 

int maxPasses = Integer.parseInt(args[2]);

Double MIN_SUPPORT_PERCENT = Double.parseDouble(args[3]);


 

(passNum-1) + "/part-r-00000"), passKMRConf

 

Integer MAX_NUM_TXNS = Integer.parseInt(args[4]);

 

System.out.println("InputDir              : " + hdfsInputDir); System.out.println("OutputDir Prefix : " + hdfsOutputDirPrefix); System.out.println("Number of Passes : " + maxPasses); System.out.println("MinSupPercent : " + MIN_SUPPORT_PERCENT); System.out.println("Max Txns : " + MAX_NUM_TXNS);

long startTime = System.currentTimeMillis();

long endTime = System.currentTimeMillis();

for(int passNum=1; passNum <= maxPasses; passNum++) { endTime = System.currentTimeMillis();

boolean isPassKMRJobDone = runPassKMRJob(hdfsInputDir, hdfsOutputDirPrefix, passNum, MIN_SUPPORT_PERCENT, MAX_NUM_TXNS);

if(!isPassKMRJobDone) {

System.err.println("Phase1 MapReduce job failed.


Exiting !!");


 

return -1;

}


System.out.println("For pass " + passNum + " = " + (System.currentTimeMillis() - endTime));

}

endTime = System.currentTimeMillis(); System.out.println("Total time taken = " + (endTime - startTime));

 

return 1;

}

private static boolean runPassKMRJob(String hdfsInputDir, String hdfsOutputDirPrefix, int passNum, Double MIN_SUPPORT_PERCENT, Integer MAX_NUM_TXNS)

throws IOException, InterruptedException,

ClassNotFoundException

{

boolean isMRJobSuccess = false;

 

Configuration passKMRConf = new Configuration(); passKMRConf.setInt("passNum", passNum); passKMRConf.set("minSup",

Double.toString(MIN_SUPPORT_PERCENT));

passKMRConf.setInt("numTxns", MAX_NUM_TXNS); System.out.println("Starting AprioriPhase" + passNum +"Job"); if(passNum > 1) {

DistributedCache.addCacheFile( URI.create("hdfs://127.0.0.1:54310"

+


hdfsOutputDirPrefix +

 

);



 

System.out.println("Added to distributed cache the output of pass " + (passNum-1));

}

*/

Job aprioriPassKMRJob = new Job(passKMRConf, jobPrefix +


passNum);


 

if(passNum == 1) {

configureAprioriJob(aprioriPassKMRJob,


AprioriPass1Mapper.class);

}

else {

configureAprioriJob(aprioriPassKMRJob, AprioriPassKMapper.class);

}

FileInputFormat.addInputPath(aprioriPassKMRJob, new Path(hdfsInputDir));

System.out.println("saurabh " + new Path(hdfsInputDir)); FileOutputFormat.setOutputPath(aprioriPassKMRJob, new

Path(hdfsOutputDirPrefix + passNum));

 


 

: false);

 

 

}


isMRJobSuccess = (aprioriPassKMRJob.waitForCompletion(true) ? true System.out.println("Finished AprioriPhase" + passNum +"Job");

return isMRJobSuccess;


@SuppressWarnings({ "unchecked", "rawtypes" })

private static void configureAprioriJob(Job aprioriJob, Class mapperClass)

{

aprioriJob.setJarByClass(MRApriori.class); aprioriJob.setMapperClass(mapperClass); aprioriJob.setReducerClass(AprioriReducer.class); aprioriJob.setOutputKeyClass(Text.class); aprioriJob.setOutputValueClass(IntWritable.class);

}

//                         Utility functions                                              

// Phase1 - MapReduce

public static class AprioriPass1Mapper extends Mapper<Object, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1); private Text item = new Text();

public void map(Object key, Text txnRecord, Context context) throws IOException, InterruptedException {

Transaction txn =

AprioriUtils.getTransaction(txnRecord.toString());

for(Integer itemId : txn.getItems()) {

item.set(itemId.toString()); context.write(item, one);

}

}

}

public static class AprioriReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text itemset, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int countItemId = 0;

for (IntWritable value : values) { countItemId += value.get();

}

// TODO : This can be improved. Creating too many strings. String itemsetIds = itemset.toString(); itemsetIds = itemsetIds.replace("[", "");

itemsetIds = itemsetIds.replace("]", ""); itemsetIds = itemsetIds.replace(" ", ""); Double minSup =

Double.parseDouble(context.getConfiguration().get("minSup")); Integer numTxns =

context.getConfiguration().getInt("numTxns", 2);

//System.out.println("dsfsdfsdf: " + MIN_SUPPORT_PERCENT

+ " " + MAX_NUM_TXNS);

// If the item has minSupport, then it is a large itemset. if(AprioriUtils.hasMinSupport(minSup, numTxns, countItemId))


{

 

IntWritable(countItemId));

}


context.write(new Text(itemsetIds), new


}

}

 

// Phase2 - MapReduce

public static class AprioriPassKMapper extends Mapper<Object, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1); private Text item = new Text();

private List<ItemSet> largeItemsetsPrevPass = new ArrayList<ItemSet>();


private List<ItemSet> candidateItemsets private HashTreeNode hashTreeRootNode


= null;

= null;


 

@Override

public void setup(Context context) throws IOException {


 

//Path[] uris = DistributedCache.getLocalCacheFiles(context.getConfiguration());

int passNum = context.getConfiguration().getInt("passNum", 2); String opFileLastPass =

context.getConfiguration().get("fs.default.name") + "/user/hduser/mrapriori-out-" + (passNum-1) + "/part-r-00000";

//System.out.println("ahsdkjdsgfjhgf" + opFileLastPass);

//System.out.println("Distributed cache file to search " +

opFileLastPass);

 

try

{

Path pt=new Path(opFileLastPass);

FileSystem fs = FileSystem.get(context.getConfiguration()); BufferedReader fis=new BufferedReader(new

InputStreamReader(fs.open(pt)));

String currLine = null;

 


 

aaaa");


//System.out.println("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa

 

while ((currLine = fis.readLine()) != null) { currLine = currLine.trim();

String[] words = currLine.split("[\\s\\t]+"); if(words.length < 2) {

continue;


}

 

List<Integer> items = new ArrayList<Integer>(); for(int k=0; k < words.length -1 ; k++){

String csvItemIds = words[k];

String[] itemIds = csvItemIds.split(","); for(String itemId : itemIds) {

 

items.add(Integer.parseInt(itemId));

}

}


 

 

supportCount); supportCount));


String finalWord = words[words.length-1];

int supportCount = Integer.parseInt(finalWord);

//System.out.println(items + " --> " + largeItemsetsPrevPass.add(new ItemSet(items,

}

}


catch(Exception e)

{


 

}

 

candidateItemsets = AprioriUtils.getCandidateItemsets(largeItemsetsPrevPass, (passNum-1));

hashTreeRootNode = HashTreeUtils.buildHashTree(candidateItemsets, passNum); // This would be changed later

}

public void map(Object key, Text txnRecord, Context context) throws IOException, InterruptedException {

Transaction txn =

AprioriUtils.getTransaction(txnRecord.toString()); List<ItemSet>

candidateItemsetsInTxn =

HashTreeUtils.findItemsets(hashTreeRootNode, txn, 0);

for(ItemSet itemset : candidateItemsetsInTxn) { item.set(itemset.getItems().toString()); context.write(item, one);

}

}

}

public static void main(String[] args) throws Exception

{


 

 

}

} OUTPUT-


int exitCode = ToolRunner.run(new MRApriori(), args); System.exit(exitCode);


1) OUTPUT1)

 

 

2

3

3

3

5

3

2)    OUTPUT2 1,32

 

2,3

2

2,5

3

3,5

2

2,3,5

2


Comments

Popular posts from this blog

ML programs

big data 8

big data 6