Hadoop Streaming
Hadoop streaming是Hadoop的一个工具, 它帮助用户创建和运行一类特殊的map/reduce作业, 这些特殊的map/reduce作业是由一些可执行文件或脚本文件充当mapper或者reducer。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar \
-input input \
-output output \
-mapper cat \
-reducer wc
Streaming工作原理
在上面的例子里,mapper和reducer都是可执行文件,它们从标准输入读入数据(一行一行读), 并把计算结果发给标准输出。Streaming工具会创建一个Map/Reduce作业, 并把它发送给合适的集群,同时监视这个作业的整个执行过程。
如果一个可执行文件被用于mapper,则在mapper初始化时, 每一个mapper任务会把这个可执行文件作为一个单独的进程启动。 mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为mapper的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。 如果没有tab,整行作为key值,value值为null。不过,这可以定制,在下文中将会讨论如何自定义key和value的切分方式。
如果一个可执行文件被用于reducer,每个reducer任务会把这个可执行文件作为一个单独的进程启动。 Reducer任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,reducer收集可执行文件进程标准输出的内容,并把每一行内容转化成key/value对,作为reducer的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。在下文中将会讨论如何自定义key和value的切分方式。
这是Map/Reduce框架和streaming mapper/reducer之间的基本通信协议。
Streaming选项与用法
用法
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar streamjob -help
Usage: $HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar [options]
Options:
-input <path> DFS input file(s) for the Map step.
-output <path> DFS output directory for the Reduce step.
-mapper <cmd|JavaClassName> Optional. Command to be run as mapper.
-combiner <cmd|JavaClassName> Optional. Command to be run as combiner.
-reducer <cmd|JavaClassName> Optional. Command to be run as reducer.
-file <file> Optional. File/dir to be shipped in the Job jar file.
Deprecated. Use generic option "-files" instead.
-inputformat <TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName>
Optional. The input format class.
-outputformat <TextOutputFormat(default)|JavaClassName>
Optional. The output format class.
-partitioner <JavaClassName> Optional. The partitioner class.
-numReduceTasks <num> Optional. Number of reduce tasks.
-inputreader <spec> Optional. Input recordreader spec.
-cmdenv <n>=<v> Optional. Pass env.var to streaming commands.
-mapdebug <cmd> Optional. To run this script when a map task fails.
-reducedebug <cmd> Optional. To run this script when a reduce task fails.
-io <identifier> Optional. Format to use for input to and output
from mapper/reducer commands
-lazyOutput Optional. Lazily create Output.
-background Optional. Submit the job and don't wait till it completes.
-verbose Optional. Print verbose output.
-info Optional. Print detailed usage.
-help Optional. Print help message.
Generic options supported are
-conf <configuration file> specify an application configuration file
-D <property=value> use value for given property
-fs <local|namenode:port> specify a namenode
-jt <local|resourcemanager:port> specify a ResourceManager
-files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars> specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines.
只使用Mapper的作业
有时只需要map函数处理输入数据。这时只需把mapreduce.job.reduces设置为零,Map/reduce框架就不会创建reducer任务,mapper任务的输出就是整个作业的最终输出。
为了做到向下兼容,Hadoop Streaming也支持“-reduce None”选项,它与“-D mapreduce.job.reduces=0”等价。
为作业指定其他插件
和其他普通的Map/Reduce作业一样,用户可以为streaming作业指定其他插件:
-inputformat JavaClassName
-outputformat JavaClassName
-partitioner JavaClassName
-combiner JavaClassName
用于处理输入格式的类要能返回Text类型的key/value对。如果不指定输入格式,则默认会使用TextInputFormat。 因为TextInputFormat得到的key值是LongWritable类型的(其实key值并不是输入文件中的内容,而是value偏移量), 所以key会被丢弃,只把value用管道方式发给mapper。
用户提供的定义输出格式的类需要能够处理Text类型的key/value对。如果不指定输出格式,则默认会使用TextOutputFormat类。
为作业指定附加配置参数
用户可以使用“-D =”增加一些配置变量。
-D mapreduce.job.reduces=0
-D stream.map.input.field.separator ='\t' #默认输入KV分隔符 \t,可以修改
其他选项
Streaming 作业的其他选项如下表: | 选项 | 可选/必须 | 描述 |
---|---|---|---|
-cluster name | 可选 | 在本地Hadoop集群与一个或多个远程集群间切换 | |
-dfs host:port or local | 可选 | 覆盖作业的HDFS配置 | |
-jt host:port or local | 可选 | 覆盖作业的JobTracker配置 | |
-additionalconfspec specfile | 可选 | 用一个类似于hadoop-site.xml的XML文件保存所有配置,从而不需要用多个"-jobconf name=value"类型的选项单独为每个配置变量赋值 | |
-cmdenv name=value | 可选 | 传递环境变量给streaming命令 | |
-cacheFile fileNameURI | 可选 | 指定一个上传到HDFS的文件 | |
-cacheArchive fileNameURI | 可选 | 指定一个上传到HDFS的jar文件,这个jar文件会被自动解压缩到当前工作目录下 | |
-inputreader JavaClassName | 可选 | 为了向下兼容:指定一个record reader类(而不是input format类) | |
-info | 可选 | 详细输出 |
注意事项
- 如果自己写的脚本和可执行文件在 node的服务器上没有,必须通过 -files 将文件内容分发到node上
- 所有脚本必须知道解释器,否则可能报错。默认是shell 。
-
开发测试时可以使用脚本pipe测试
cat input.txt | sort | wc cat input.txt | Mapper | sort | Reducer
- 一般性参数要放前面,紧跟jar包后面
Generic options supported are -conf <configuration file> specify an application configuration file -D <property=value> use value for given property -fs <local|namenode:port> specify a namenode -jt <local|resourcemanager:port> specify a ResourceManager -files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster -libjars <comma separated list of jars> specify comma separated jar files to include in the classpath. -archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines.
脚本示例
shell版wordcount
mapper.sh
#!/bin/bash
while read LINE; do
for word in $LINE
do
echo -e "$word\t1"
done
done
reducer.sh
#!/bin/bash
count=0
word=""
while read LINE;do
fields=($LINE)
new_word=${fields[0]}
if [ "$word" != "$new_word" ];then
if [ $count -ne 0 ]; then
echo -e "${word}\t${count}"
count=0
fi
word=$new_word
count=1
else
count=$(($count + 1))
fi
done
echo -e "${word}\t${count}"
测试运行:
测试数据:
input.txt
hello world
hello hadoop
shell 测试:
test@ubuntu:~# cat input.txt |./mapper.sh |sort|./reducer.sh
hadoop 1
hello 2
world 1
stream 测试提交
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar \
-files /opt/mapper.sh,/opt/reducer.sh \
-input input \
-output output \
-mapper mapper.sh \
-reducer reducer.sh