========================================================
HADOOP (Movie Analytics) - Same steps for other, change code and name.
========================================================
PROJECT NAME:
MovieAnalytics

OBJECTIVE:
Design a distributed application using MapReduce which processes Movie dataset.
Recommend movies based on user ratings using Hadoop pseudo-distributed mode.

========================================================
DATASET FORMAT (IMPORTANT)
========================================================

userId,movieId,rating,timestamp

NOTE:
- No header file (NO column names)
- Comma separated values only

EXAMPLE:
1,31,2.5,1260759144
1,1029,3,1260759179
1,1061,3,1260759182
1,1129,2,1260759185
1,1172,4,1260759205

========================================================
PROJECT CREATION IN ECLIPSE
========================================================

file > new > Java Project
give project name: MovieAnalytics
select src 


in 'Libraries' tab:
add external jars

add all jars from:
 /usr/lib/hadoop
 /usr/lib/hadoop/client

click finish

-----------------------------------------------

CREATE CLASS:

right click src → New → Class
give class name: MovieAnalytics
click finish

-----------------------------------------------

now write Java MapReduce code

-----------------------------------------------

EXPORT JAR:

right click projectname → export
save as: MovieAnalytics.jar

========================================================
HADOOP COMMAND STEPS
========================================================

STEP 1: create input folder in HDFS
------------------------------------------------
hdfs dfs -mkdir /inputfolder1


STEP 2: create dataset file
------------------------------------------------
cat > movies.txt

(paste dataset eg.
1,31,2.5,1260759144
1,1029,3,1260759179
1,1061,3,1260759182
...)
press CTRL + D


STEP 3: put file into HDFS
------------------------------------------------
hdfs dfs -put /home/cloudera/movies.txt /inputfolder1/


STEP 4: execute program
------------------------------------------------
hadoop jar /home/cloudera/MovieAnalytics.jar MovieAnalytics /inputfolder1/movies.txt /out7


STEP 5: view output
------------------------------------------------
hdfs dfs -cat /out7/part-r-00000






---------******---------

codes:

1)
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxLoginTime {

    public static class LogMapper
            extends Mapper<LongWritable, Text, Text, LongWritable> {

        private Text ip = new Text();
        private LongWritable durationValue = new LongWritable();

        // Correct date format
        private static final SimpleDateFormat sdf =
                new SimpleDateFormat("d/M/yyyy H:mm");

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            try {

                // CSV split
                String[] fields = value.toString().split(",");

                // Ignore invalid lines
                if (fields.length < 8)
                    return;

                String userIP = fields[1].trim();

                String loginTime = fields[5].trim();
                String logoutTime = fields[7].trim();

                Date loginDate = sdf.parse(loginTime);
                Date logoutDate = sdf.parse(logoutTime);

                long duration =
                        (logoutDate.getTime() - loginDate.getTime())
                                / (1000 * 60);

                ip.set(userIP);
                durationValue.set(duration);

                context.write(ip, durationValue);

            } catch (Exception e) {

                // Skip bad lines
                System.out.println("Skipping Invalid Record");
            }
        }
    }

    public static class SumReducer
            extends Reducer<Text, LongWritable,
                            Text, LongWritable> {

        private LongWritable result = new LongWritable();

        public void reduce(Text key,
                           Iterable<LongWritable> values,
                           Context context)
                throws IOException, InterruptedException {

            long total = 0;

            for (LongWritable val : values) {
                total += val.get();
            }

            result.set(total);

            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "Max Login Time");

        job.setJarByClass(MaxLoginTime.class);

        job.setMapperClass(LogMapper.class);
        job.setCombinerClass(SumReducer.class);
        job.setReducerClass(SumReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        FileInputFormat.addInputPath(job,
                new Path(args[0]));

        FileOutputFormat.setOutputPath(job,
                new Path(args[1]));

        System.exit(job.waitForCompletion(true)
                ? 0 : 1);
    }
}


*************

2)

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

************

3) 
import java.io.IOException;
import java.util.HashSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MusicAnalytics {

    // Mapper
    public static class MusicMapper
            extends Mapper<Object, Text, Text, Text> {

        private Text trackId = new Text();
        private Text userShared = new Text();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {

            try {
                String[] fields = value.toString().split(",");

                if (fields.length < 5) return;

                String userId = fields[0].trim();
                String track = fields[1].trim();
                String shared = fields[2].trim();

                trackId.set(track);

                // value = userId,sharedFlag
                userShared.set(userId + "," + shared);

                context.write(trackId, userShared);

            } catch (Exception e) {
                // skip bad lines
            }
        }
    }

    // Reducer
    public static class MusicReducer
            extends Reducer<Text, Text, Text, Text> {

        public void reduce(Text key,
                           Iterable<Text> values,
                           Context context)
                throws IOException, InterruptedException {

            HashSet<String> uniqueUsers = new HashSet<>();

            int shareCount = 0;

            for (Text val : values) {

                String[] parts = val.toString().split(",");

                String user = parts[0];
                int shared = Integer.parseInt(parts[1]);

                uniqueUsers.add(user);

                if (shared == 1) {
                    shareCount++;
                }
            }

            String result =
                    "UniqueListeners=" + uniqueUsers.size()
                    + ", Shares=" + shareCount;

            context.write(key, new Text(result));
        }
    }

    // Driver
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "Music Analytics");

        job.setJarByClass(MusicAnalytics.class);

        job.setMapperClass(MusicMapper.class);
        job.setReducerClass(MusicReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job,
                new Path(args[0]));

        FileOutputFormat.setOutputPath(job,
                new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}



*************

4)
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MusicAnalytics {

    // Mapper
    public static class MusicMapper
            extends Mapper<Object, Text, Text, Text> {

        private Text trackId = new Text();
        private Text radioSkip = new Text();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {

            try {
                String[] fields = value.toString().split(",");

                // Expecting: UserId,TrackId,Shared,Radio,Skip
                if (fields.length < 5) return;

                String track = fields[1].trim();
                String radio = fields[3].trim();
                String skip = fields[4].trim();

                trackId.set(track);

                // value = radio,skip
                radioSkip.set(radio + "," + skip);

                context.write(trackId, radioSkip);

            } catch (Exception e) {
                // skip malformed lines
            }
        }
    }

    // Reducer
    public static class MusicReducer
            extends Reducer<Text, Text, Text, Text> {

        public void reduce(Text key,
                           Iterable<Text> values,
                           Context context)
                throws IOException, InterruptedException {

            int radioCount = 0;
            int skipCount = 0;

            for (Text val : values) {

                String[] parts = val.toString().split(",");

                int radio = Integer.parseInt(parts[0]);
                int skip = Integer.parseInt(parts[1]);

                radioCount += radio;
                skipCount += skip;
            }

            String result = "RadioCount=" + radioCount
                          + ", SkipCount=" + skipCount;

            context.write(key, new Text(result));
        }
    }

    // Driver
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "Music Analytics Radio Skip Count");

        job.setJarByClass(MusicAnalytics.class);

        job.setMapperClass(MusicMapper.class);
        job.setReducerClass(MusicReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}


************

5)
import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MovieAnalytics {

    // Mapper
    public static class MovieMapper
            extends Mapper<Object, Text, Text, DoubleWritable> {

        private Text movieId = new Text();
        private DoubleWritable ratingValue = new DoubleWritable();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {

            try {
                String[] fields = value.toString().split(",");

                if (fields.length < 4) return;

                String movie = fields[1].trim();
                double rating = Double.parseDouble(fields[2].trim());

                movieId.set(movie);
                ratingValue.set(rating);

                context.write(movieId, ratingValue);

            } catch (Exception e) {
                // ignore bad records
            }
        }
    }

    // Reducer
    public static class MovieReducer
            extends Reducer<Text, DoubleWritable, Text, Text> {

        private Map<String, Double> movieAvgMap = new HashMap<String, Double>();

        public void reduce(Text key,
                           Iterable<DoubleWritable> values,
                           Context context)
                throws IOException, InterruptedException {

            double sum = 0;
            int count = 0;

            for (DoubleWritable val : values) {
                sum += val.get();
                count++;
            }

            double avg = (count == 0) ? 0.0 : (sum / count);

            movieAvgMap.put(key.toString(), avg);
        }

        @Override
        protected void cleanup(Context context)
                throws IOException, InterruptedException {

            // Convert map to list
            List<Map.Entry<String, Double>> list =
                    new ArrayList<Map.Entry<String, Double>>(movieAvgMap.entrySet());

            // SORT (Java 7 compatible)
            Collections.sort(list, new Comparator<Map.Entry<String, Double>>() {
                public int compare(Map.Entry<String, Double> a,
                                   Map.Entry<String, Double> b) {
                    return Double.compare(b.getValue(), a.getValue()); // DESC order
                }
            });

            // Top 10
            int topN = Math.min(10, list.size());

            for (int i = 0; i < topN; i++) {
                Map.Entry<String, Double> entry = list.get(i);

                String result = "AvgRating=" + entry.getValue();

                context.write(new Text(entry.getKey()), new Text(result));
            }
        }
    }

    // Driver
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "Top 10 Movie Recommendation");

        job.setJarByClass(MovieAnalytics.class);

        job.setMapperClass(MovieMapper.class);
        job.setReducerClass(MovieReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
