반응형
앞쪽의 예제에서 configuration을 콘솔에서 적용하는 방법을 설명했습니다.
https://tourspace.tistory.com/232
위 예제에서는 맵리듀스 수행시 -D 옵션으로 worktype 값을 설정하고, map에서 해당 값을 읽어 분리했습니다.
하지만 이럴경우 worktype별로 맵리듀스를 여러번 수행해야합니다.
어차피 데이터를 쭈~욱 로드해서 분류해내는 작업인데, param에 따라 같은 데이터를 몇번씩 읽어서 처리하는건 비효율적이기 때문에 한번에 읽고 출력 파일을 따로 만들수 있는 Multi-output을 제공합니다.
※ 코드는 kotlin으로 작성되었습니다.
Mapper의 구성
먼저 mapper를 아래와 같이 구성합니다.
예제는 BasketWinners라는 농구팀의 점수기록파일이라고 가정합니다.
Mapper의 입력은 <Line번호, 점수> 이고 출력은 변경된 key와 점수 입니다.
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
class MultiOutputMapper : Mapper<LongWritable, IntWritable, Text, IntWritable>(){
// map 출력값
private val outputValue = IntWritable(1)
// map 출력키
private val outputKey = Text()
private val teamName = "BasketWinners"
override fun map(key: LongWritable, value: IntWritable, context: Context) {
val point = value.get()
when(point) {
1-> outputKey.set("1,$teamName")
2-> outputKey.set("2,$teamName")
3-> outputKey.set("3,$teamName")
}
context.write(outputKey, IntWritable(point))
}
}
Mapper에 키를 만들때 점수에 따라 "1,BasketWinners", "2,BasketWinners", "3,BaskterWinners" 로 구분자를 넣어 key를 생성합니다.
Reducer의 구현
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
class MultiOutputReducer : Reducer<Text, IntWritable, Text, IntWritable>() {
private val result = IntWritable()
private lateinit var multiOutput: MultipleOutputs;
override fun setup(context: Context) {
multiOutput = MultipleOutputs(context)
}
@Throws(IOException::class, InterruptedException::class)
public override fun reduce(key: Text, values: Iterable<IntWritable>, context: Context) {
// 콤마로 분리
val pointType = key.toString().split(",")
val sum = values.map {it.get()}.sum()
result.set(sum)
when (pointType[0]) {
"1" -> multiOutput.write("1point", Text("1point"), result)
"2" -> multiOutput.write("2point", Text("2point"), result)
"3" -> multiOutput.write("3point", Text("3point"), result)
}
}
override fun cleanup(context: Context) {
multiOutput.close()
}
}
reducer에서는 key를 ,로 구분하고, 해당 값에 따라 파일을 따로 생성합니다.
이때 setup()에서 MutilpleOutputs()를 생성하고, cleanup에서 close()를 호출해 줍니다.
Main 함수의 구성
마지막으로 main 함수를 생성할때 (job을 생성 및 설정할때) 아래와 같은 형태로 출력형태를 지정해 줍니다.
MultipleOutputs.addNamedOutput(job, "1point",
TextOutputFormat::class.java, Text::class.java, IntWritable::class.java
)
MultipleOutputs.addNamedOutput(job, "2point", TextOutputFormat::class.java,
Text::class.java, IntWritable::class.java
)
MultipleOutputs.addNamedOutput(job, "2point", TextOutputFormat::class.java,
Text::class.java, IntWritable::class.java
)
MultipleOutputs.addNamedOutput() 함수로 다수의 출력 파일을 추가할 수 있습니다.
반응형
'개발이야기 > Hadoop' 카테고리의 다른 글
[Hadoop] 하둡 - 카운터(counter)의 사용 (0) | 2019.09.27 |
---|---|
[Hadoop] 하둡 - 사용자 정의 옵션 GenericOptionParser, Tool, ToolRunner (0) | 2019.09.26 |
[Hadoop] 하둡 - 맵 리듀스(MapReduce)의 수행 단계 (0) | 2019.09.24 |
[Hadoop] 하둡 - 맵리듀스의 기초, 기본 예제 #2 (0) | 2019.09.23 |
[Hadoop] 하둡 - 맵리듀스의(MapReduce) 기초 #1 (0) | 2019.09.20 |