본문으로 바로가기
반응형

하둡은 데이터를 맵핑하여 원하는 형태로 만들고 맵핑된 데이터를 정렬하여 원하는 결과로 reduce 시킵니다.

앞서서는 하둡에서 사용하는 file system인 HDFS를 설명했다면, 실제로 하둡 프로그래밍을 하기 위해서 필요한 맵리듀스에 대해서 얘기합니다.

맵리듀스는 병렬로 분산처리를 지원해주는 Framework 입니다.
사용자는 처리해야할 로직을 잘 구현해 놓고 맵리듀스에 던져주면 알아서 병렬처리를 하여 결과값을 반환해 줍니다.

이글은 "시작하세요! 하둡 프로그래밍"을 참고하였습니다.

대부분의 내용이 생략되고 정리된 내용이므로 자세한 내용은 꼭 책을 구입하여 확인하시기 바랍니다.



맵리듀스의 구성

1. JobTracker
잡트래커는 전체 하둡 클러스터에 하나만 존재하며 주어진 job의 스케쥴링 및 모니터링을 담당합니다.
보통 네임노드에서 담당하나, 잡 트래커용 서버를 따로 둘수도 있습니다.

잡트래커는 아래와 같은 역할을 수행합니다.
  • 맵과 리듀스를 몇개 실행할지 결정
  • 실행을 담당할 TaskTracker를 결정하고 Job을 할당 
  • TaskTracker를 모니터링 하여 장애를 검출시 다른 TaskTracker에 할당

2. TaskTracker
데이터노드에서 실행되는 데몬으로 JobTracker의 요청에 따라 map task와 reduce task를 생성합니다.
요청한 task가 여러개일 경우 task 개수만큼 JVM을 띄워 병렬로 처리합니다.


맵리듀스의 동작

1. InputSplit

HDFS에서 입력데이터를 읽어들입니다.

파일이 하나라도 HDFS에서 설정한 블록 크기 단위로 split을 생성하고 split 당 하나의 mapTask를 생성합니다.

ex) HDFS 블럭 크기가 128MB 이며, 읽어들일 파일이 300MB면 split은 3개 생성됨 (mapTask도 3개 생성됨)

MapTask는 레코드 단위로 데이터를 읽어 사용자가 설정한 Mapper의 map() 함수를 수행합니다.

map() 함수를 거지면 결과는 (key, value) 형태로 저장됩니다.

이때 생성되는 결과는 MapTask가 실행된 서버 내부에 저장되며, job이 완료되면 삭제됩니다. 


2. Shuffle and Sorting

MapTask의 결과를 ReduceTask로 전달하기 위해 정렬하는 단계 입니다.

ReduceTask의 개수만큼 파티션을 만들고 파티셔너는 MapTask의 결과를 각 partition으로 보냅니다.

MapTask 결과의 key에서 hash값을 구하고 hash에 따라 어떤 partition으로 이동될지가 결정됩니다.

shuffle 단계가 완료되면 각 파티션에는 동일한 key를 갖는 값들이 모이게 되고, 각 key들로 sorting됩니다.


3. Reduce

key별로 정렬된 결과를 마지막 형태로 가공하여 hdfs에 저장됩니다.

이때 ReduceTask는 정렬된 결과를 레코드 단위로 읽어 사용자가 정의한 reduce 함수에 전달합니다.

reduce함수로 처리된 결과가 마지막으로 hdfs에 part-xxxxx라는 파일로 저장됩니다.

xxxxx는 00000 부터 1씩 증가하며, 파티션 번호를 나타냅니다.

만약 블럭단위가 128MB 인 경우 그 이하 크기인 파일을 하둡으로 돌린다면 part-00000 하나만 생성되겠죠?


데이터 타입

맵리듀스 프로그램을 작성하기 위한 기본적인 클래스를 설명합니다.

먼저 맵리듀스는 입출력을 모두 key, value 형태로 사용하며, 이런 데이터들은 네트워크를 통해 전송됩니다.
따라서 입출력 데이터들은 이런 맵 형태 및 serialize 가능한 형태로 만들어져야 하며, 이를 위해 WritableComparable interface를 구현해야 합니다.
WritableComparable interface는 이름처럼 Writable과 Comparable interface를 상속받고 있습니다.

따라서 custom 데이터 타입을 사용하기 위해서 WritableComparable를 상속받아 구현한다면, 직렬화와 역직렬화(write(), readFileds()) 함수와 compareTo() 함수를 오버라이드 해야 합니다.

기본적으로 제공하는 타입은 아래와 같습니다.
  • BooleanWritable
  • ByteWritable
  • DoubleWritable
  • FloatWritable
  • IntWritable
  • LongWritable
  • TextWrapper (UTF8 형태의 문자열)
  • NullWritable (데이터 값이 필요 없을때)

InputFormat

InputSplit을 통하여 MapTask에 데이터가 입력될때 데이터의 타입을 정의합니다.
job에 따로 정의하지 않을 경우 기본값은 TextInputFormat 입니다.

지원하는 타입은 아래와 같습니다.
  • TextInputFormat: \n 기준으로 읽음 key = 라인번호 (LongWritable) | value = 라인내용(Text)
  • KeyValueTextInputFormat: 키값을 라이번호가 아닌 다른 값을 지정 할 때 사용
  • NLineInputFormat: MapTask가 입력받을 텍스트 파일의 라인수를 제한 할 때 사용
  • DelegatingInputFormat: 서로 다른 입력 포멧이 필요할때 각 경로에 작업을 위임 할 때 사용
  • CombineFileInputFormat: 여러개의 파일을 묶어서 split 시킬때 사용
  • SequenceFileInputFormat: SequenceFile을 입력데이터로 쓸때 사용
  • SequenceFileAsBinaryInputFormat: SequenceFile의 키값을 임의의 바이너리 객체로 변환해서 사용
  • SequenceFileAsTextInputFormat: SequenceFile의 키값을 Text 객체로 변환해서 사용


OutputFormat

ReduceTask에서 reduce가 끝나고 출력할 데이터 포멧을 나타냅니다.
job에서 setter를 이용하여 값을 설정하나, 미설정시 TextOutputFormat 을 기본값을 사용합니다.

모든 데이터 타입은 최상위 클래스인 OutputFormat과 이를 상속받은 FileOutputFormat을 상속받아 구현되어 있습니다.

지원하는 타입은 아래와 같습니다.
  • TextOutputFormat: 텍스트 파일에 레코드를 출력 key, value는 탭으로 구분함
  • SequenceFileOutputFormat: SequenceFile을 출력 할 때 사용
  • SequenceFileByBinaryOutputFormat: SequenceFile에 key, value를 binary 값으로 씀.
  • FilterOutputFormat OutputFormat의 클래스를 편하게 사용할 수 있는 메서드 제공
  • LazyOutputFormat: FileOutputFormat를 상속받은 다른 타입들을 출력내용이 없더라도 출력 파일(part-xxxxx)를 생성하나, 레코드가 파티션으로 보내질때만 출력 파일을 생성함
  • NullOutputFormat: 출력파일이 없을때 사용

Sequence file

데이터를 키와 값 형태로 저장하는 바이너리 파일 입니다.
Writer, Reader, Sorter를 제공하며, Writer를 이용하여 무압축, 레코드 압축, 블럭 압축을 할수 있습니다. 
다양한 압축 코덱을 이용할 수 있으며, 압축이 되더라도 Split이 가능합니다.
(Text의 파일의 경우 압축 코덱에 따라 split이 되지 않을수 도 있습니다.)


반응형