본문으로 바로가기
반응형

앞쪽의 예제에서 configuration을 콘솔에서 적용하는 방법을 설명했습니다.

https://tourspace.tistory.com/232



위 예제에서는 맵리듀스 수행시 -D 옵션으로 worktype 값을 설정하고, map에서 해당 값을 읽어 분리했습니다.

하지만 이럴경우 worktype별로 맵리듀스를 여러번 수행해야합니다.

어차피 데이터를 쭈~욱 로드해서 분류해내는 작업인데, param에 따라 같은 데이터를 몇번씩 읽어서 처리하는건 비효율적이기 때문에 한번에 읽고 출력 파일을 따로 만들수 있는 Multi-output을 제공합니다.


※ 코드는 kotlin으로 작성되었습니다.


Mapper의 구성

먼저 mapper를 아래와 같이 구성합니다.
예제는 BasketWinners라는 농구팀의 점수기록파일이라고 가정합니다.
Mapper의 입력은 <Line번호, 점수> 이고 출력은 변경된 key와 점수 입니다.
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

class MultiOutputMapper : Mapper<LongWritable, IntWritable, Text, IntWritable>(){
    // map 출력값
    private val outputValue = IntWritable(1)
    // map 출력키
    private val outputKey = Text()
    private val teamName = "BasketWinners"

    override fun map(key: LongWritable, value: IntWritable, context: Context) {

        val point = value.get()
        when(point) {
            1-> outputKey.set("1,$teamName")
            2-> outputKey.set("2,$teamName")
            3-> outputKey.set("3,$teamName")
        }

        context.write(outputKey, IntWritable(point))
    }
}


Mapper에 키를 만들때 점수에 따라 "1,BasketWinners", "2,BasketWinners", "3,BaskterWinners" 로 구분자를 넣어 key를 생성합니다.


Reducer의 구현

import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

class MultiOutputReducer : Reducer<Text, IntWritable, Text, IntWritable>() {

    private val result = IntWritable()
    private lateinit var multiOutput: MultipleOutputs;

    override fun setup(context: Context) {
        multiOutput = MultipleOutputs(context)
    }

    @Throws(IOException::class, InterruptedException::class)
    public override fun reduce(key: Text, values: Iterable<IntWritable>, context: Context) {
        // 콤마로 분리
        val pointType = key.toString().split(",")

        val sum = values.map {it.get()}.sum()
        result.set(sum)

        when (pointType[0]) {
            "1" -> multiOutput.write("1point", Text("1point"), result)
            "2" -> multiOutput.write("2point", Text("2point"), result)
            "3" -> multiOutput.write("3point", Text("3point"), result)
        }
    }

    override fun cleanup(context: Context) {
        multiOutput.close()
    }

}


reducer에서는 key를 ,로 구분하고, 해당 값에 따라 파일을 따로 생성합니다.

이때 setup()에서 MutilpleOutputs()를 생성하고, cleanup에서 close()를 호출해 줍니다.


Main 함수의 구성

마지막으로 main 함수를 생성할때 (job을 생성 및 설정할때) 아래와 같은 형태로 출력형태를 지정해 줍니다.

 MultipleOutputs.addNamedOutput(job, "1point",
        TextOutputFormat::class.java, Text::class.java, IntWritable::class.java
    )
    MultipleOutputs.addNamedOutput(job, "2point", TextOutputFormat::class.java,
        Text::class.java, IntWritable::class.java
    )
    MultipleOutputs.addNamedOutput(job, "2point", TextOutputFormat::class.java,
        Text::class.java, IntWritable::class.java
    )


MultipleOutputs.addNamedOutput() 함수로 다수의 출력 파일을 추가할 수 있습니다.


반응형