팀 매니저님께서 결과를 보시며 의문을 표하시기에, 조사해봄.

일단 Hadoop Definite Guide 책에서 Combiner Function에 대해 찾아보면,

Many MapReduce jobs are limited by the bandwidth available on the cluster, so it pays to minimize the data transferred between map and reduce tasks. Hadoop allows the user to specify a combiner function to be run on the map output—the combiner function’s output forms the input to the reduce function.
from Hadoop Definite Guide 2nd edition, Ch2. MapReduce

이렇게 나와 있다. 대충 정리해보면, map output에 대해 combiner를 적용한 다음, 이 결과가 reduce의 input으로 쓰이며, combiner를 적용함으로써, map에서 reduce로 전송되는 데이터 양을 줄일 수 있다라고 설명하고 있다.

이렇게만 보면, map의 output이 combiner의 input으로 사용되는 것처럼 보이므로, map output records와 combine input records가 일치해야 하는 것처럼 보인다. 그러나 실제로는 combine input records가 더 큰 경우가 많다. (항상 많은지는 모르겠음) 왜 그럴까?

천신만고 끝에 찾은 힌트 하나

After the shuffle each reducer will have one input for each map. The reducer needs to merge these inputs in order to begin processing. It is not efficient to merge too many inputs simultaneously. Thus, if the number of inputs exceeds a certain value, the data will be merged and rewritten to disk before being given to the reducer. During this merge the combiner will be applied in an attempt to reduce the size of the input data.
from Oreilly

나는 지금까지 combiner는 mapper에서만 동작하는지 알았는데, 실제로는 reducer에서 data를 shuffle하는 단계 (정확히는 merge)에서도 combiner가 동작을 한다!

좀 더 자세히 설명을 하면,

image
from Hadoop Definite Guide 2nd edition, Ch6. How MapReduce Works

  1. map의 output을 memory에 쓰다가, 할당된 memory buffer (기본은 100 MB)의 일정 비율 (기본은 80%)을 넘어서면, 현재 memory에 저장된 map의 output을 mapper node의 local file로 쓴다. (이 과정을 spill이라고 함)
  2. 실제 file로 쓰기 전에, 해당 data가 전송될 reducer 단위로 partitioning을 하고, 각각의 partition에 대해 백그라운드 thread가 in-memory sorting을 수행한 다음, sort의 output에 대해 combine 함수가 실행된다.
  3. memory buffer가 spill threashold에 다다르면, 새로운 spill file이 생성되며, 실제 map task가 수행되는 동안, 이러한 spill file이 여러 개 생성된다. map task 가 끝나기 전에, 여러 개의 spill file들을 하나의 파일로 합치게 되는데, 이렇게 하나의 파일로 merge를 하는 단계에서도 combine 함수가 실행된다.
  4. mapper의 결과가 reducer로 copy된 다음, reducer에서 여러 mapper에서 온 결과들을 sort하는 과정을 거치는데, 이 때 각각의 merge 단계에서도 combine 함수가 실행된다.

요약하면, mapper 뿐만이 아니라, reducer 단계에서도 combine 함수가 적용될 수 있으므로, Combine input records의 수가 Map output records의 수 보다 클 수도 있다.

※ 여기서 개발자들이 combiner를 사용할 때 유의해야 할 점은, combiner의 input으로 항상 map의 output이 들어온다 라고 가정을 하면 안 된다는 점이다. 위 설명에서도 알 수 있듯이, combiner의 output이 다시 combiner의 input으로 들어올 수 있다. 그러므로, combiner의 input은 map의 output format 뿐만 아니라, combiner의 output format도 적절히 처리할 수 있어야 한다.

의외로 자료가 별로 없어서.. 직접 소스코드와 로그를 뒤지면서 정리를 해 보았다.

출처 : Hadoop: The Definitive Guide, Second Edition

일단 책에 나와 있는 내용은 저 정도이고, 실제 소스코드를 찾아보면, 책에는 생략된 block management에 대한 내용도 있다. (사실 요 부분이 궁금해서..)
테스트 환경은 hadoop 0.20.5 / pseudo-distribute 모드이며, protocol 자체는 최근에 릴리즈된 hadoop 1.0 버전과 크게 차이는 없을 것이라고 생각된다. 아래는 local에 있는 102 MB (정확히는 106,168,320 byte) 샘플 파일을 hdfs에 upload하는 과정에서 발생한 log (debug log 포함)들을 모아서 정리한 것이다.

node

time

message

comment

Client

21:50:56

hdfs.DFSClient: /test.txt: masked=rwxr-xr-x

Client hdfs://test.txt 라는 파일을 생성하고자 함

Client

21:50:56

hdfs.DFSClient: Allocating new block

Client NameNode에게 block 요청

NameNode

21:50:56,717

*DIR* NameNode.create: file /test.txt for DFSClient_439896788 at 127.0.0.1

Client file을 요청하면, NameNode namespace (메모리)에서 빈 file entry 생성

NameNode

21:50:56,717

DIR* NameSystem.startFile: src=/test.txt, holder=DFSClient_439896788, ClientMachine=127.0.0.1, replication=1, overwrite=false, append=false

FSNameSystem.startFileInternal()

NameNode

21:50:56,718

DIR* FSDirectory.addFile: /test.txt is added to the file system

NameNode

21:50:56,718

DIR* NameSystem.startFile: add /test.txt to namespace for DFSClient_439896788

NameNode

21:50:56,724

ugi=chaehyun    ip=/127.0.0.1   cmd=create  src=/test.txt  dst=null    perm=chaehyun:supergroup:rw-r--r--

FSEditLog.logSync()가 완료 된 뒤 출력된 메시지.
메모리에 추가된 file entry가 정상적으로 EditLog (file)로 기록됨

NameNode

21:50:56,729

*BLOCK* NameNode.addBlock: file /test.txt for DFSClient_439896788

/test.txt 파일을 위한 block을 요청

NameNode

21:50:56,729

BLOCK*NameSystem.getAdditionalBlock: file /test.txt for DFSClient_439896788

해당 file의 내용을 기록할 block 이 block을 저장할 machine list를 반환
이 시점에서 block id와 DataNode list가 결정되며, Client는 DataNode list 중 첫 번째 machine에게 data를 전송

NameNode

21:50:56,729

DIR* FSDirectory.addFile: /test.txt with blk_-1208142459293778377_1012block is added to the in-memory file system

메모리에 저장된 file entry에 할당받은 block 정보들을 추가

NameNode

21:50:56,729

BLOCK* NameSystem.allocateBlock: /test.txt. blk_-1208142459293778377_1012

FSNameSystem.allocateBlock()

Client

21:50:56

hdfs.DFSClient: pipeline = 127.0.0.1:50010

 

Client

21:50:56

hdfs.DFSClient: Connecting to 127.0.0.1:50010

Client DataNode로 접속

Client

21:50:56

hdfs.DFSClient: Send buf size 131072

Client 해당 DataNode data를 packet 단위로 쪼개서 전송 시작

DataNode

21:50:56,750

Receiving block blk_-1208142459293778377_1012 src: /127.0.0.1:6047 dest: /127.0.0.1:50010

DataNode Client로 부터 첫 번째 block을 받기 시작

DataNode

21:50:57,606

PacketResponder 0 for block blk_-1208142459293778377_1012Closing down.

DataNode Client로 부터 첫 번째 block 받기를 끝냄

Nadenode

21:50:57,609

*BLOCK* NameNode.blockReceived: from 127.0.0.1:50010 1 blocks.

DataNode NameNode에게 보고.
"형님 block 잘 받았습니다!"

Nadenode

21:50:57,609

BLOCK* NameSystem.blockReceived: blk_-1208142459293778377_1012 is received from 127.0.0.1:50010

해당 block들을 정상적으로 수신했음을 알림. 
"오~ 니가 이 block들을 받았구나~"

Nadenode

21:50:57,609

BLOCK* NameSystem.addStoredBlock: blockMap updated: 127.0.0.1:50010 is added to blk_-1208142459293778377_1012 size 67108864

NameNode block들을 실제로 저장한 DataNode들의 정보를 저장
"나중에 딴 소리 못 하게 적어놔야지.."

Client

21:50:57

Allocating new block

Client가 두 번째 block을 요청
"아직 데이터가 많이 남았습니다! block 하나 더 주세요!"

Nadenode

21:50:57,610

*BLOCK* NameNode.addBlock: file /test.txt for DFSClient_439896788

Nadenode

21:50:57,610

BLOCK* NameSystem.getAdditionalBlock: file /test.txt for DFSClient_439896788

NameNode에서 추가 block 할당

Nadenode

21:50:57,610

DIR* FSDirectory.addFile: /test.txt with blk_5548041259556473375_1012 block is added to the in-memory file system

Nadenode

21:50:57,610

BLOCK* NameSystem.allocateBlock: /test.txt. blk_5548041259556473375_1012

Client

21:50:57

pipeline = 127.0.0.1:50010


Client

21:50:57

Connecting to 127.0.0.1:50010

Client DataNode packet 단위로 쪼개진 data 전송 시작

DataNode

21:50:57,611

Receiving block blk_5548041259556473375_1012 src: /127.0.0.1:6050 dest: /127.0.0.1:50010

DataNode Client로 부터 두 번째block을 받기 시작

DataNode

21:50:58,034

PacketResponder 0 for block blk_5548041259556473375_1012 Closing down.

DataNode Client로 부터 두 번 째 block 받기를 끝냄

Nadenode

21:50:58,037

*BLOCK* NameNode.blockReceived: from 127.0.0.1:50010 1 blocks.

DataNode NameNode에게 보고

Nadenode

21:50:58,037

BLOCK* NameSystem.blockReceived: blk_5548041259556473375_1012 is received from 127.0.0.1:50010

해당 block들을 정상적으로 수신했음을 알림

Nadenode

21:50:58,037

BLOCK* NameSystem.addStoredBlock: blockMap updated: 127.0.0.1:50010 is added to blk_5548041259556473375_1012 size 39059456

NameNode block들을 실제로 저장한 DataNode들을 저장

Nadenode

21:50:58,038

*DIR* NameNode.complete: /test.txt for DFSClient_439896788

Client가 파일을 다 썼음

Nadenode

21:50:58,038

DIR* NameSystem.completeFile: /test.txt for DFSClient_439896788

file을 구성하는 모든 block들이 최소 replication 이상 복사 되었음을 확인

Nadenode

21:50:58,038

Removing lease on  file /test.txt from Client DFSClient_439896788

Nadenode

21:50:58,038

DIR* FSDirectory.closeFile: /test.txt with 2 blocks is persisted to the file system

Nadenode

21:50:58,038

DIR* NameSystem.completeFile: file /test.txt is closed by DFSClient_439896788

파일 쓰기 완료

  
복잡해 보이니 간단히 요약해보자.

  1. Client에서 파일을 쓰기 위해 NameNode에게 file 생성을 요청을 하면, NameNode는 먼저 메모리에 해당 path에 이미 파일이 존재하는지, Client가 적절한 권한을 가지고 있는지 확인한 다음, 문제가 없으면, namespace 상에서 file entry를 생성한다.
  2. 그런 다음, 실제 file을 저장할 block을 할당하고, Client의 위치와 DataNode들의 저장 상황, 위치 등을 고려하려, 해당 block을 저장할 DataNode 들을 선정한다.
  3. Client는 NameNode로 부터 받은 block 정보를 이용하여, 첫 번째 DataNode에 data를 전송하고, 전송이 정상적으로 완료되면, NameNode에게 다음 block을 요청한다.
  4. NameNode가 다시 block을 할당하면, Client가 file의 다음 내용을 DataNode에게 전송하고, 이 과정을 반복한다.
  5. Client에 있는 file의 모든 data가 DataNode들에게 정상적으로 전송이 끝나고, NameNode가 해당 파일에 속하는 모든 block들이 최소한의 복제 계수만큼 복사가 완료되었다고 판단하면 file write가 종료된다.

그 외 내가 궁금해서 찾아본 내 맘대로 FAQ)

  • Q) NameNode는 block을 어떻게 관리하나?
    A) block은 long으로 구분되며, 이론 상 2^64 개의 block을 생성할 수 있다. block은 계속 생성되고 소멸되므로, block id (long)을 관리하는 일이 중요한데, NameNode에서는 현재 굉장히 simple하게-_- 관리를 한다. 랜덤하게 block id값을 고른 다음, 아직 사용하지 않은 숫자가 나올 때 까지 계속한다. (허무할 정도로 간단하게..)
    참고로, 사용 중인 block들에 대한 정보는 BlocksMap 이라는 class에서 관리하며, 내부에서는 LightWeightGSet 이라는 자체 set class를 만들어서 저장한다.

  • Q) block을 저장할 DataNode들은 어떻게 선발하나요?
    A) hadoop 0.20.5 버전에서는 ReplicationTargetChooser 라는 class가 해당 역할을 수행한다. 현재 버전에서의 복사 전략은 Client가 DataNode일 경우에는 먼저 해당 DataNode가 첫 번째 노드로 선정된다. (이럴 경우, 첫 번째 복사는 network을 타지 않기 때문에 빠르다) 만약 Client가 DataNode가 아니면, 그냥 랜덤하게 고른다.
    그리고 두 번째 node는 첫 번째 DataNode와 다른 rack에 위치한 DataNode들을 고르고, 세 번째 DataNode는 첫 번째 node와 같은 rack에 있는 node를 고른다. 그리고 네 번째 부터는 그냥 랜덤하게 고른다.
    이렇게 함으로써, data 전송을 위한 network traffic을 줄이고, data 안정성을 최대한으로 높일 수 있다. (복제계수가 2이상이면, 최소 하나 이상의 block은 다른 rack에 저장되므로, 한 rack 전체가 날아가더라도, file은 살아 있을 것이다)
    그리고 ReplicationTargetChooser에게 DataNode를 요청할 때는 exclude list를 함께 줘서, 장애가 발생한 DataNode나, 이미 해당 block을 저장하고 있는 DataNode, decommission 중인 DataNode들은 제외되도록 설계되어 있다.


─ tag  hadoop, hdfs, 하둡

예를 들어 hadoop의 TextInputFormat 을 보자. 이 포맷은 하둡의 default input format인데, 파일에서 text들을 라인 단위로 읽어서 map task에게 제공해주는 역할을 한다. (참고로 key는 파일 내에서 각 라인의 시작 지점까지의 바이트 오프셋 값이다.)

여기서 당연히 한 가지 의문이 생기는데, 기본적으로 InputSplit은 HDFS의 블록으로 쪼개진다. (특별히 따로 InputSplit을 정의하지 않았다면) 그럼 HDFS block boundary가 TextInputFormat의 line boundary와 정확히 일치하지 않을 텐데, hadoop에서는 이를 어떻게 처리하고 있을까?

Hadoop: The Definite Guide 책을 보면, 아래와 같은 그림이 나온다.

그리고 설명하기를, block boundary와 input split의 논리적인 단위 (여기서는 line 단위)가 일치하지 않더라도, input split을 잘 계산하여 처리한다고 나와 있다. 이 설명만 놓고 보면, 마치 InputFormat에서 InputSplit을 계산할 때, 이런 논리적인 단위가 나눠지는 것처럼 보인다.

하지만 실제로 InputSplit을 구현한 FileSplit을 코드를 보면, 아래와 같은 정보만을 가지고 있다.

  • private Path file;   // 어떤 file에서 나온 split인가
  • private long start; // file 내에서 해당 input split의 시작 위치
  • private long length;  // input split의 크기
  • private String[] hosts; //  해당 input split을 가지고 있는 data node들의 host name

그리고 이런 FileSplit을 계산해 내는 FileInputFormat 클래스 소스를 살펴보면, 논리적인 단위를 계산하는 부분은 없고, 단지 file을 block boundary 단위로 나누어, FileSplit을 계산한다. 즉, FileSplit이 가지고 있는 정보는, 논리적인 단위가 아닌, 단순히 물리적인 block의 경계에 대한 정보만 가지고 있다. 그렇다면 실제로 block 단위가 아닌, line 단위로 input split을 재계산 해 주는 녀석은 누구일까?

비밀은 바로 RecordReader에 숨어 있다. InputFormat interface는 getSplits() 메소드를 통해 split을 계산해내고, getRecordReader() 메소드를 통해 해당 split을 읽을 수 있는 RecordReader를 반환한다.

그리고 각 MapTask가 실행될 때, map() 메소드를 호출하기 전에 RecordReader의 initialize() 메소드를 한 번 호출한다. 그리고 TextInputFormat이 사용하는 LineRecordReader의 initilize() 메소드를 살펴보면, map task로 전달된 input split이 파일의 첫 번째 block이 아닐 경우, 무조건 첫 번째 line은 skip하도록 구현되어 있다. (start position에 -1을 한 다음, 한 줄을 건너뜀) 이제 저 위 그림이 이해가 될 것이다.

정리를 하면, file을 처리하는 InputSplit은 단순히 물리적인 block 단위 (기본적으로는 hdfs block 단위)로 쪼개져서 map task에게 전달된다. 그리고 각 map task는 이렇게 쪼개진 input split으로부터, 실제로 map() 에서 처리해야 하는 논리적인 record 단위를 계산하고, 이를 바탕으로 InputSplit의 boundary를 다시 계산하여 처리를 하게 되는 것이다.

─ tag  hadoop, inputsplit, 하둡

hadoop은 기본적으로 input split에 의해 계산된 각 split에 대해 하나의 map task를 생성한다. (여담이지만, 실제 map task의 개수는 mapred.map.tasks의 값이나 setNumMapTasks() 에 의해 결정되지 않는다. 자세한 이야기는 여기 참조)

InputSplit은 딱 두 개의 method를 가진 abstract class로 정의되어 있고, 각 method는 다음과 같다.

  1. public abstract long getLength();          // split의 size를 반환
  2. public abstract String[] getLocations(); //split의 data가 저장된 node들을 반환

실제 split을 구현한 class로 FileSplit class가 있다. (DB에 있는 data 역시 input split으로 쓰일 수 있는 모양이지만, 실제로 hadoop에 구현된 class는 없는 듯..)

FileSplit은 아래와 같은 member variable을 가지는데,,

  • private Path file          //파일 이름
  • private long start        // 파일에서 처리할 바이트의 첫 시작 위치
  • private long length      // 처리할 바이트 수
  • private String[] hosts  // 실제 해당 block을 가지고 있는 host들의 list

이러한 InputSplit들은 mapreduce job을 실행하는 단계에서 보면 4. submit job 단계에서 계산이 된다. 즉, JobClient가 JobTracker에게 작업을 제출할 때 계산된다.

image

Firgure 6-1. How Hadoop runs a MapReduce job. 출처 : Hadoop The definite Guide

 

그런데 막상 코드를 보면, 위 그림에서처럼 딱 무 자르듯이 나눠져 있는 건 아니고, 3번 단계에서 job에 필요한 리소스 (jar 파일들, 설정 파일 등등) 들을 hdfs의 특정 경로에 복사를 하는데, 이 특정 경로에 계산된 inputsplit 정보들도 같이 복사가 된다.

그리고 6. retrieve input splits 단계에서는 3~4번 단계에서 복사된 input split가 담긴 파일들을 가져와서 map task에서 넘겨주는 구조임.

input split을 계산하는 단계가 4번인지 6번인지 헷갈려서 조사해봄.

─ tag  hadoop, 하둡

* 정보 공유 차원에서 오늘 한 삽질을 기록으로 남김

  1. 상황
    • map/reduce 프레임웍이 필요한 것은 아니고, 단순하지만 처리하는데 오래 걸리는 작업들을 여러 컴퓨터에 분산하여 처리하고자 함.
    • reduce task 개수를 0으로 주고 mapper only로 설정하여 작업을 시작.
    • 문제 : 현재 hadoop의 scheduler는 preemptive 하지 않기 때문에, 한 번 map task가 시작되면, priority가 낮더라도 해당 map task가 끝날 때 까지 계속 진행됨. 덕분에 나의 단순하고 오래 걸리는 작업들이 한 번 hadoop에서 돌기 시작하면, 다른 긴급한 작업들이 처리되지 못하는 상황이 발생!
    • 시도된 해결책
      • 나의 job에서 동시에 실행되는 map task 개수를 제한하여, 다른 긴급한 작업들이 처리될 수 있도록 함
  2. 삽질.. 삽질..
    • 첫 번째 시도 : mapred.jobtracker.maxtasks.per.job=10 으로 설정
      • 하나의 job 당 최대 task 수를 10으로 설정했으니, 한 번에 실행되는 map task의 수도 제한되지 않을까? 라고 생각했으나,, 실제로는 적용 안됨. ㅜ.ㅜ
    • 두 번 째 시도 : mapred.tasktracker.map.tasks.maximum=1 로 설정
      • 이 설정의 의미는 하나의 task tracker에 의해 동시에 실행되는 map task의 개수
      • 저렇게 설정하면, 하나의 task tracker에 의해서 하나의 map task만 실행되어야 한다. 하지만 실제로는 적용이 안됨.
      • 이 부분에 대해서는 원인을 찾았는데, Hadoop The Definitive Guide라는 책에 보면 아래와 같이 쓰여 있다.
      • Be aware that some properties have no effect when set in the client configuration. For example, if in your job submission you set mapred.tasktracker.map.tasks.maximum with the expectation that it would change the number of task slots for the tasktrackers running your job then you would be disappointed, since this property only is only honored if set in the tasktracker’s mapred-site.html file.

      • 해석하면, mapred.tasktracker.~ 로 실행되는 설정들은 tasktracker daemon에 의해서만 적용되며, client에서 설정한 값은 무시 된다는 말씀.
    • 세 번째 시도 : mapred.map.tasks=10 으로 설정!
      • mapred.map.tasks 설정은 하나의 job 당 map task 의 개수이므로, map task의 개수가 적으면 당연히 동시에 실행되는 map task의 개수도 제한 될 것이라고 생각했지만!! 여전히 적용 안됨.
      • 원인을 찾아본 결과, mapred.map.tasks는 map task의 개수는 늘릴 수 있지만, 하둡이 결정한 map task의 개수 이하로는 줄일 수 없다고 함. 출처 : http://wiki.apache.org/hadoop/HowManyMapsAndReduces
      • 참고로 하둡은 하나의 input split 당 하나의 map task를 생성함. input split이란, 각 input file들을 구성하고 있는 하나 하나의 data block을 뜻함.
    • 발상의 전환 : 굳이 한 번에 실행되는 map task의 개수를 줄일 필요가 있을까?
      • 문제가 되는 상황은 나의 job이 모든 map task slot을 점유하고, 다른 긴급한 job들이 나의 slot을 뺏을 수 없어서 생기는 문제.
      • 그렇다면, 하나의 map task 가 빨리 빨리 종료되도록 단위를 줄이면, 그 사이 다른 priority 가 높은 job들이 종료된 map slot을 차지하여, map task을 실행시킬 수 있게 되어, priority가 역전되는 문제를 회피할 수 있음.
      • map task를 얼마나 늘려야 할까? 심플하게 mapred.map.tasks=100000000 정도로 설정하면, 하나의 map task가 처리하는 단위가 작아지고, 굉장히 빠른 시간에 종료되므로, preemptive 문제는 해결할 수 있지만, 이 경우, input split이 너무 작아질 수 있기 때문에, hadoop에서 배보다 배꼽이 더 큰 상황이 발생할 수 있다.
      • 이때 구세주처럼 등장한 설정이 있으니 바로 mapred.min.split.size ! 하나의 input split의 최소 단위를 설정할 수 있는데, 이 설정을 이용하면, map task가 지나치게 많아지는 것을 방지할 수 있다.
      • 최종 해결책
        • mapred.min.split.size=4194304  (하나의 input split은 최소 4MB 단위로 처리해라)
        • mapred.map.tasks=1000000   (최대한 map task를 잘게 쪼개서, 다른 작업에 방해가 되지 않도록 하자)
        • 실제로 위와 같이 설정하면, 전체 map task가 백 만 개 이하로 적절하게 쪼개져서 동작함. (몇 개로 쪼개지는 지는 전체 input file의 size에 따라 다름)
─ tag  hadoop, 하둡

요즘 업무 때문에 hadoop을 많이 사용하고 있다. 오늘의 삽질 항목은 hadoop에서 iterator를 재사용할 수 없을까? 하는 것이다. hadoop의 reducer는 동일한 key로 묶인 value들을 가져올 수 있도록, Iterable<?> type을 반환해 준다. 그런데 쓰다 보면, 이 reducer의 값들을 두 번 이상 읽고 싶을 때가 있다. 예를 들면, 전체 value들의 개수를 세어서 값들을 출력할 때 함께 찍어주고 싶다거나.. 기타 max, min 값을 구해서 같이 출력 하고 싶다거나 등 은근 필요한 경우가 많다.

처음에는 단순하게 Iterable<text> values 변수를 복사해 뒀다가, 다시 iterator를 호출해서 쓰는 것을 시도했다. 대충 돌려보니 컴파일 오류 안 나고 실행도 잘 되는 듯 했다! 올레를 외치며 대용량 데이터에 적용을 했는데.. 몇 시간 동안 열심히 작업을 하 더니 내 놓은 결과는,,  reducer output records : 0개  -_- 결과 값이 아무 것도 없는.. T_T

구글링을 해 보니, 구현상의 이유로, reducer의 iterator는 두 번 읽을 수 없다고 한다. (요기 참고) 생각해보면, key에 딸린 value들이 어마어마하게 많으면, 파일 블록 하나로 처리가 안될 테니, 여러 파일에서 나눠서 읽어야 할 테고, 여기서 iterator를 다시 호출 할 수 있게 하려면 시스템이 복잡해 질 수도 있겠다 싶었다. (그래도 좀 해주면 안 되려나)

하여간, 이를 극복하기 위해서는,

  1. value들을 모두 list에 따로 저장
  2. value들을 local file에 저장했다 다시 읽기

요 두 가지 방법 정도가 떠올랐다. 처음에 list에 저장하도록 해서 돌렸더니, 여기저기서 out of memory 오류를 뿌리며 죽었다. –_-; 은근 메모리에 다 못 올릴 cluster들이 많은가 보다.

그래서 local file에 저장해서 다시 읽을까 했는데, 작은 cluster에 대해서도 일일이 local file에 저장하고 읽으면 이것도 은근 속도에 bottleneck이 될 듯 하여.. n개 이하는 메모리의 list로 처리하고, n개가 넘어가면, 파일에 저장해서 다시 읽도록 구현해서 돌렸다. 그런데 이게 어디서 오류가 생긴건지 24시간이 지나도록 몇몇 reducer 작업들이 끝나지를 않는다. –_-; 아놔

일단 시간이 없어서, 결국 이번에는 iterator를 한 번 사용해서 결과를 찍고, 결과를 별도로 후처리를 하여 원하는 값을 얻었다. 뭔가 더 깔끔한 방법이 있을 듯 한데,, 방법이 없을까?

─ tag  hadoop, Java, 하둡
openclose