Hadoop Streaming 介绍及实践

分享 青牛 ⋅ 于 2016-12-05 19:00:23 ⋅ 4374 阅读

Hadoop Streaming

Hadoop streaming是Hadoop的一个工具, 它帮助用户创建和运行一类特殊的map/reduce作业, 这些特殊的map/reduce作业是由一些可执行文件或脚本文件充当mapper或者reducer。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar \
    -input input \
    -output output \
    -mapper cat \
    -reducer wc

Streaming工作原理

在上面的例子里,mapper和reducer都是可执行文件,它们从标准输入读入数据(一行一行读), 并把计算结果发给标准输出。Streaming工具会创建一个Map/Reduce作业, 并把它发送给合适的集群,同时监视这个作业的整个执行过程。

如果一个可执行文件被用于mapper,则在mapper初始化时, 每一个mapper任务会把这个可执行文件作为一个单独的进程启动。 mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为mapper的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。 如果没有tab,整行作为key值,value值为null。不过,这可以定制,在下文中将会讨论如何自定义key和value的切分方式。

如果一个可执行文件被用于reducer,每个reducer任务会把这个可执行文件作为一个单独的进程启动。 Reducer任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,reducer收集可执行文件进程标准输出的内容,并把每一行内容转化成key/value对,作为reducer的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。在下文中将会讨论如何自定义key和value的切分方式。

这是Map/Reduce框架和streaming mapper/reducer之间的基本通信协议。

Streaming选项与用法

用法

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar streamjob -help
Usage: $HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar [options]
Options:
  -input          <path> DFS input file(s) for the Map step.
  -output         <path> DFS output directory for the Reduce step.
  -mapper         <cmd|JavaClassName> Optional. Command to be run as mapper.
  -combiner       <cmd|JavaClassName> Optional. Command to be run as combiner.
  -reducer        <cmd|JavaClassName> Optional. Command to be run as reducer.
  -file           <file> Optional. File/dir to be shipped in the Job jar file.
                  Deprecated. Use generic option "-files" instead.
  -inputformat    <TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName>
                  Optional. The input format class.
  -outputformat   <TextOutputFormat(default)|JavaClassName>
                  Optional. The output format class.
  -partitioner    <JavaClassName>  Optional. The partitioner class.
  -numReduceTasks <num> Optional. Number of reduce tasks.
  -inputreader    <spec> Optional. Input recordreader spec.
  -cmdenv         <n>=<v> Optional. Pass env.var to streaming commands.
  -mapdebug       <cmd> Optional. To run this script when a map task fails.
  -reducedebug    <cmd> Optional. To run this script when a reduce task fails.
  -io             <identifier> Optional. Format to use for input to and output
                  from mapper/reducer commands
  -lazyOutput     Optional. Lazily create Output.
  -background     Optional. Submit the job and don't wait till it completes.
  -verbose        Optional. Print verbose output.
  -info           Optional. Print detailed usage.
  -help           Optional. Print help message.

Generic options supported are
-conf <configuration file>     specify an application configuration file
-D <property=value>            use value for given property
-fs <local|namenode:port>      specify a namenode
-jt <local|resourcemanager:port>    specify a ResourceManager
-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.

只使用Mapper的作业

有时只需要map函数处理输入数据。这时只需把mapreduce.job.reduces设置为零,Map/reduce框架就不会创建reducer任务,mapper任务的输出就是整个作业的最终输出。

为了做到向下兼容,Hadoop Streaming也支持“-reduce None”选项,它与“-D mapreduce.job.reduces=0”等价。

为作业指定其他插件

和其他普通的Map/Reduce作业一样,用户可以为streaming作业指定其他插件:
-inputformat JavaClassName
-outputformat JavaClassName
-partitioner JavaClassName
-combiner JavaClassName
用于处理输入格式的类要能返回Text类型的key/value对。如果不指定输入格式,则默认会使用TextInputFormat。 因为TextInputFormat得到的key值是LongWritable类型的(其实key值并不是输入文件中的内容,而是value偏移量), 所以key会被丢弃,只把value用管道方式发给mapper。

用户提供的定义输出格式的类需要能够处理Text类型的key/value对。如果不指定输出格式,则默认会使用TextOutputFormat类。

为作业指定附加配置参数

用户可以使用“-D =”增加一些配置变量。

 -D mapreduce.job.reduces=0
 -D stream.map.input.field.separator ='\t' #默认输入KV分隔符  \t,可以修改

其他选项

Streaming 作业的其他选项如下表: 选项 可选/必须 描述
-cluster name 可选 在本地Hadoop集群与一个或多个远程集群间切换
-dfs host:port or local 可选 覆盖作业的HDFS配置
-jt host:port or local 可选 覆盖作业的JobTracker配置
-additionalconfspec specfile 可选 用一个类似于hadoop-site.xml的XML文件保存所有配置,从而不需要用多个"-jobconf name=value"类型的选项单独为每个配置变量赋值
-cmdenv name=value 可选 传递环境变量给streaming命令
-cacheFile fileNameURI 可选 指定一个上传到HDFS的文件
-cacheArchive fileNameURI 可选 指定一个上传到HDFS的jar文件,这个jar文件会被自动解压缩到当前工作目录下
-inputreader JavaClassName 可选 为了向下兼容:指定一个record reader类(而不是input format类)
-info 可选 详细输出

注意事项

  1. 如果自己写的脚本和可执行文件在 node的服务器上没有,必须通过 -files 将文件内容分发到node上
  2. 所有脚本必须知道解释器,否则可能报错。默认是shell 。
  3. 开发测试时可以使用脚本pipe测试

    cat input.txt | sort | wc
    cat input.txt | Mapper | sort | Reducer
  4. 一般性参数要放前面,紧跟jar包后面
    Generic options supported are
    -conf <configuration file>     specify an application configuration file
    -D <property=value>            use value for given property
    -fs <local|namenode:port>      specify a namenode
    -jt <local|resourcemanager:port>    specify a ResourceManager
    -files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
    -libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
    -archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.

脚本示例

shell版wordcount

mapper.sh

#!/bin/bash
while read LINE; do
  for word in $LINE
  do
    echo -e "$word\t1"
  done
done

reducer.sh

#!/bin/bash
count=0
word=""
while read LINE;do
  fields=($LINE)
  new_word=${fields[0]}
  if [ "$word" != "$new_word" ];then
     if [ $count -ne 0 ]; then
         echo -e "${word}\t${count}"
         count=0
     fi
    word=$new_word
    count=1
  else
    count=$(($count + 1))
  fi
done
echo -e "${word}\t${count}"

测试运行:
测试数据:
input.txt

hello world
hello hadoop

shell 测试:

test@ubuntu:~# cat input.txt |./mapper.sh |sort|./reducer.sh
hadoop  1
hello   2
world   1

stream 测试提交

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar \ 
    -files /opt/mapper.sh,/opt/reducer.sh  \
    -input input    \
    -output output  \  
    -mapper mapper.sh  \
    -reducer reducer.sh
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-青牛,http://hainiubl.com/topics/3
本帖由 青牛 于 6年前 解除加精
点赞
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter