본문으로 바로가기
반응형

하둡의 핵심인 맵 리듀스를 구현하는건 크게 어렵지 않습니다.

1. mapper의 map()을 내가 하고자 하는걸로 override 해준다.

2. reducer의 reduce()를 내가 원하는걸로 override 해준다.

3. 입출력 타입, mapper와 reducer의 클래스가 무언인지등의 정보를 설정해주는 job을 생성하여 실행시킨다.


이 세단계로 구현하면 되기 때문에 하둡의 샘플 예제에도 있는 wordCount를 직접  만들어 보겠습니다.


※ 본 예제는 Kotlin을 사용하였습니다.

먼저 mapper를 구성합니다.

/**
 * Mapper<입력받는 데이터 Key 타입, 입력받는 데이터 value 타입, Mapper의 결과 데이터 Key 타입,  Mapper의 결과 데이터 value 타입
 */
class CountMapper : Mapper<LongWritable, Text, Text, IntWritable>() {
    companion object {
        val resultValue = IntWritable(1)
    }

    private val word = Text()

    @Throws(IOException::class, InterruptedException::class)
    override fun map(key: LongWritable, value: Text, context: Context) {
        // 수신받은 string을 쪼개서 배열로 받음.
        val strings = StringTokenizer(value.toString())
        while (strings.hasMoreTokens()) {
            word.set(strings.nextToken())
            context.write(word, resultValue)
        }
    }

}


map() 함수에 입력되는 값은 아래와 같습니다.

- key: input file의 라인 번호

- value: 읽어들인 text 한줄 (개행문자 \n 까지)


넘겨받은 param을 tokenizer를 이용하여 단어로 자른후에 context에 다시 key와 value값으로 넣습니다.

이때의 output의 key, value는 아래와 같습니다.

- key: 단어 (text)

- value: 1 (나중에 key별로 grouping하여 sum하여 단어의 수를 세기 때문에 무조건 1을 할당 합니다.)


reducer를 구현합니다.

/**
 * Mapper<입력받는 데이터 Key 타입, 입력받는 데이터 value 타입, Reducer의 결과 데이터 Key 타입,  Reducer의 결과 데이터 value 타입
 */
class CountReducer : Reducer() {
    private val resulValue = IntWritable()

    @Throws(IOException::class, InterruptedException::class)
    public override fun reduce(key: Text, values: Iterable, context: Context) {
        val sum = values.map { it.get() }.sum()
        resulValue.set(sum)
        context.write(key, resulValue)
    }
}


reducer를 상속받아 reduce 함수를 override 했습니다.

이때 reduce의 param으로 key와 key가이 가지고 있는 value를 배열로 받습니다.

따라서 key 단위로 value의 배열을 돌아서 모두 더해주도록 합니다.

ex) 입력 파일에 people 라는 단어가 열번 들어 있었다면 key = "people", value = {1,1,1,1,1,1,1,1,1,1} 이 param으로 들어오겠죠?


마지막으로 하둡을 실행시킬 main 함수를 만듭니다.

class WordCount {
    companion object {
        @JvmStatic
        fun main(args: Array<String>) {
            val conf = Configuration()
            // 읽을 파일, 출력폴더명을 argument로 받아야 한다.
            if (args.size != 2) {
                System.err.println("Usage: WordCount <Input> <Output>")
                System.exit(-1)
            }

            // Job을 생성
            val job = Job(conf, "WordCount")

            // 사용할 jar library를 설정 (여기서는 WordCount 임)
            job.setJarByClass(WordCount::class.java)

            // Mapper 클래스를 할당
            job.mapperClass = CountMapper::class.java

            // Recuder 클래스를 할당
            job.reducerClass = CountReducer::class.java

            // 입력으로 사용할 데이터 포멧 할당 (일반적인 text 파일을 source로 사용)
            job.inputFormatClass = TextInputFormat::class.java

            // 최종 결과로 출력될 데이터 포멧 할당 (일반적인 text로 파일을 출력.
            job.outputFormatClass = TextOutputFormat::class.java

            // 출력될 Key와 Value의 데이터 포멧을 결정
            job.outputKeyClass = Text::class.java
            job.outputValueClass = IntWritable::class.java

            // source 입력 경로 설절
            FileInputFormat.addInputPath(job, Path(args[0]))
            // 최종 결과 파일을 저장할 경로 설정
            FileOutputFormat.setOutputPath(job, Path(args[1]))

            // mapreduce 수행.
            job.waitForCompletion(true)
        }
    }
}


main 함수에서는 Job을 생성하여 필요한 정보를 set 후에 waitForCompletion() 함수로 하둡에 연산을 요청합니다.


Client는 필요한 연산을 위한 함수만 override하고 입력/출력관련 데이터 포멧만 결정하면 하둡이 알아서 분산처리하여 결과를 알려줍니다.

예제는 간단합니다만, 실제 구현을 한다면 복잡할순 있습니다만, 어차피 override해야 하는함수는 두개 뿐입니다.


구현이 완료되었으니, jar 파일로 만듭니다.

jar로 만드는 방법은 아래 링크에 설명되어 있습니다.


jar 생성이 완료되면 아래와같은 순서로 수행 시킵니다.


1. input으로 사용할 text를 준비합니다.

과일 이름으로 간단하게 만들어 봤습니다.


2. hdfs에 업로드 합니다.

3. jar 파일을 수행 시킵니다.


4. 연산이 완료되면 out 폴더의 part-r-00000 파일의 생성을 확인합니다.


5. cat으로 파일 내부를 열어보면 단어로 분리되어 개수가 couting되어 나온걸 볼수 있습니다.



반응형