Benutzer-Werkzeuge

Webseiten-Werkzeuge


nosql:mapreduce_hadoop_20_job

Einen eigenen ersten MapReduce Job erstellen

Einleitung

  • Ein MapReduce Job verdichtet die Daten Schritt für Schritt
  • Die Verarbeitung erfolgt Listen orientiert
  • Sehr gut für die Parallelisierung geeignet
  • Ursprung in der funktionalen Programmierung (Funktion map() und fold() bzw reduce())

Zwei Haupt Phasen:

  • Map
    • Liest Key/Value Paare ein und gibt Key/Value Paare je nach Bedarf wieder aus ( wie z.b. sortiert)
    • map (input) → list(intermediate_value)
  • Reduce
    • Verarbeitet die Key/Value Paare und giebt ebenfalls Key/Value Paare wieder aus
    • reduce (out_key, list(intermediate_value)) → list(out_value)

Ein erstes Beispiel

Übersicht - eine Liste aller in den Texten vorkommenden Auto Hersteller erstellen:

Übersicht MapReduce

Einen eigenen MapReduce in Java für Hadoop entwickeln

Übersicht über die generelle Architektur

Im Beispiel sollen die Buchstaben in den Daten gezählt werden:

MapReduce mit Hadoop

Die Elemente im einzelnen:

Client:

  • Der Client Driver konfiguriert den Job und sendet Job Anfrage an das Cluster
  • Über das Interface „JobConf“ wird der Job konfiguriert

InputFormater:

  • Der Mapper Prozesse verwendet einen InputFormater um die Daten zu lesen

Mapper:

  • Der Mapper ließt die Daten in Key/Value Format ein
  • Pro Zeile (je nach InputFormater ) wird die map Methode einmal aufgerufen
    • Im Default ist der Key der Offset der Zeile (Zeilennummer)
  • Der Mapper erstellt eine Liste mit Output Values im Key/Value Format
  • Mapper speicher Daten lokal und werden vom Reducer dann abgeholt
    • Konfigurierbar ab wievel % bereits vor Abschluss Daten bereitgestellt werden sollen
  • Der eigene Mapper wird von der Klasse „Mapper<Object, Text, Text, IntWritable>„ abgeleitet
  • Pro split/Block einer Datei wird eine Mapper Instance gestartet

Shuffle-And-Sort:

  • Zuständig für das Sortieren der Ergebnisse der einzelnen Mapper als Input für den Reducer
  • Ein Teil wird im Mapper,ein Teil im Reducer durchgeführt

Combiner:

  • Läuft auf dem selben Knoten wie der Mapper
  • Dient zur Verdichtung von Ergebnissen des Mappers
  • Wird aber nur bei Bedarf verwendet
  • Kann gleiche Klasse wie der Reducer sein falls (nur bei distributiven Funktionen wie A=B+C B=A+C)

Partitioner:

  • Legt fest wie die Daten auf die Reducer verteilt werden sollen

Sort:

  • Vor der Verarbeitung durch den Reducer werden die Daten sortiert

Reducer:

  • Verarbeitet die Zwischenergebnisse
  • Erzeugt das Endergebnis
  • Die eigene Reducer Klasse wird von der Klasse „Reducer<Text, IntWritable, Text, IntWritable>„ abgeleitet

OutputFormat:

  • Das OutputFormat legt fest wie der Reducer die Daten im HDFS ablegt
  • Default: Tab Format

Implementierung

Mit Hadoop 2 hat sich die API geändert, es muss daher das Package „org.apache.hadoop.mapreduce“ verwendet werden.

Umgebung einrichten

Ziel ist es den Job mit dem JDeveloper auf einem Windows7 Rechner zu erstellen.
Größte Herausforderung ist dabei eine laufähige Umgebung unter Win7 einzurichten.
Mit MS Windows Clients mit Hadoop arbeiten

Für den lokalen Test kann ist die Implementierung des „hadoop Tool interfaces“ notwendig!

Beispiel siehe ⇒ http://hadoopi.wordpress.com/2013/06/05/hadoop-implementing-the-tool-interface-for-mapreduce-driver/

hadoop jar <jar file> <startClass> -fs file:/// -jt local <local input Dir> <local output Dir>
Die Aufgabe

Es soll das in Oracle typische Beispiel, wie viele Mitarbeiter arbeiten in welcher Abteilung umgesetzt werden:

SELECT COUNT(*), deptno FROM emp
/

Zur Zeit allerdings erstmal das Sum Beispiel implementiert um einen ersten einfachen Test zumzuseten.

Die Daten werden im dem typischem emp format als CSV Liste im HDFS abgelegt.

Die klassischen Testdaten erzeugen:

SET linesize 1000 
SET trimspool ON 
SET pagesize 0 
SET feedback off
 
spool /tmp/emp.csv
 
SELECT empno || ':' ||
       ename || ':' ||
       job   || ':' ||
       mgr   || ':' ||
       to_char(hiredate,'dd.mm.yyyy')|| ':' ||
       sal   || ':' ||
       comm  || ':' ||
       deptno
 FROM  scott.emp
ORDER BY ename
/
 
spool off

CSV auf das HDFS kopieren

hdfs dfs -put /tmp/emp.csv /user/gpipperr
hdfs dfs -cat /user/gpipperr/emp.csv
 
7499:ALLEN:SALESMAN:7698:20.02.1981:1600:300:30
7782:CLARK:MANAGER:7839:09.06.1981:2450::10
7566:JONES:MANAGER:7839:02.04.1981:2975::20
7839:KING:PRESIDENT::17.11.1981:5000::10
7654:MARTIN:SALESMAN:7698:28.09.1981:1250:1400:30
7934:MILLER:CLERK:7782:23.01.1982:1300::10
7844:TURNER:SALESMAN:7698:08.09.1981:1500:0:30
7521:WARD:SALESMAN:7698:22.02.1981:1250:500:30
Die Mapper Klasse
package gpi.hadoop;
 
 
import java.io.IOException;
 
import java.util.StringTokenizer;
 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
// input Key - input Value - output Key - output Value
public class DeptCountMapper extends Mapper<Object, Text, Text, IntWritable> {
 
 
    static IntWritable oneValue = new IntWritable(1);
 
    @Override
    // input Key - input Value - output Value
    public void map(Object key, Text value, Context contex) throws IOException, InterruptedException {
        /*
        input:
        0    1     2      3    4         5     6   7
        7876:ADAMS:CLERK:7788:12.01.1983:1100:300:20
 
        Split in Key/Values pairs
 
        20,1 
 
       */
 
        // read on row
        String[] emprow = value.toString().split(":");
 
        String deptno = emprow[7];
 
        contex.write(new Text(deptno), oneValue);      
 
    }
}
Die Reducer Klasse
package gpi.hadoop;
 
import java.io.IOException;
 
import java.util.Iterator;
 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
 
 
public class DeptCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 
    private IntWritable totalWordCount = new IntWritable();
 
       @Override
    public void reduce(Text deptno, Iterable<IntWritable> counts, Context context) throws IOException,
                                                                                          InterruptedException {
        //logic
 
        /*
 
        30,1,1,1,
        50,1,1
 
        Result should look like this
        30 3
        60 2
 
        */
        int deptcount = 0;
        for (IntWritable count : counts) {
            deptcount += 1;
        }
        context.write(deptno, new IntWritable(deptcount));
    }
}
Driver
package gpi.hadoop;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class DeptCount {
 
        public static void main(String[] args) throws Exception {
 
        Configuration conf = new Configuration();
 
 
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 3) {
            System.err.println("Usage: DeptCount [input] [output]");
            System.exit(2);
        }
        else  {
            System.out.println("Call DeptCount with Parameter 1::"+args[1]+" Parameter 2::"+args[2]);
        }
 
        // create a new Configuration
        Job job =  Job.getInstance(conf);
 
        job.setJobName(args[0]);
 
        // Mapper
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(DeptCountMapper.class);
 
 
        // Reducer
        job.setReducerClass(DeptCountReducer.class);
        job.setOutputFormatClass(TextOutputFormat.class);
 
        // Input and Output Path to the data
        FileInputFormat.setInputPaths(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
 
        //main driver Class
        job.setJarByClass(DeptCountT.class);
 
        //set Output Class
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
 
        job.submit();
 
    }
}

Beispiel für die Verwendung des Tool Interfaces für die Startklasse:

package gpi.hadoop;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class DeptCountT extends Configured implements Tool {
 
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new DeptCountT(), args);
        System.exit(res);
    }
 
    @Override
    public int run(String[] args) throws Exception {
        //get Config object
        Configuration conf = this.getConf();
 
        // create a new Configuration
        Job job = Job.getInstance(conf);
 
        job.setJobName(args[0]);
 
        // Mapper
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(DeptCountMapper.class);
 
 
        // Reducer
        job.setReducerClass(DeptCountReducer.class);
        job.setOutputFormatClass(TextOutputFormat.class);
 
        // Input and Output Path to the data
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
        //main driver Class
        job.setJarByClass(DeptCount.class);
 
        //set Output Class
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
 
        // Execute job and return status
        return job.waitForCompletion(true) ? 0 : 1;
    }
}

Jar file erzeugen und auf den Hadoop Server kopieren

Jar File im Oracle JDeveloper

Jar File aufrufen:

yarn jar GpiHadoopExamples.jar gpi.hadoop.DeptCount /user/gpipperr/emp.csv /user/gpipperr/empRun1
 
yarn application -list
 
 
hdfs dfs -cat /user/gpipperr/empRun1/part-r-00000
 
10 3 
20 1
30 4

Hinweise

Wird mit yarn jar <class.jar> <class_name> <in> <out>„ der Job aufgerufen, ist der Parameter 1 der Klassenamen, der Parameter 2 das in Verzeichnis und der Parameter 3 das out Verzeichnis.

D.h. passt aber nicht zu der Verwendung der Eingabe Parameter in den verbreiteten Dokumentationen und Beispielen.

Fehler:

istsException: Output directory hdfs://quickstart.cloudera:8020/user/gpipperr/emp.csv already exists
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://quickstart.cloudera:8020/user/gpipperr/emp.csv already exists

Nur wenn in der Jar Datei die Main Klasse definiert wird und ein Aufruf nach diesem Muster durchgeführt wird, passt das im Detail: yarn jar <class.jar> <class_name> <in> <out>„

Ansonsten muss bei Fehler mit den Aufrufparamentern (Typischerweise „Output directory xxx already exists“ ) der Index der Parameter im Code entsprechend angepasst werden.

Weiteres Beispiel

Quellen

Cookies helfen bei der Bereitstellung von Inhalten. Durch die Nutzung dieser Seiten erklären Sie sich damit einverstanden, dass Cookies auf Ihrem Rechner gespeichert werden. Weitere Information
"Autor: Gunther Pipperr"
nosql/mapreduce_hadoop_20_job.txt · Zuletzt geändert: 2014/09/28 22:06 von Gunther Pippèrr