팀 매니저님께서 결과를 보시며 의문을 표하시기에, 조사해봄.

일단 Hadoop Definite Guide 책에서 Combiner Function에 대해 찾아보면,

Many MapReduce jobs are limited by the bandwidth available on the cluster, so it pays to minimize the data transferred between map and reduce tasks. Hadoop allows the user to specify a combiner function to be run on the map output—the combiner function’s output forms the input to the reduce function.
from Hadoop Definite Guide 2nd edition, Ch2. MapReduce

이렇게 나와 있다. 대충 정리해보면, map output에 대해 combiner를 적용한 다음, 이 결과가 reduce의 input으로 쓰이며, combiner를 적용함으로써, map에서 reduce로 전송되는 데이터 양을 줄일 수 있다라고 설명하고 있다.

이렇게만 보면, map의 output이 combiner의 input으로 사용되는 것처럼 보이므로, map output records와 combine input records가 일치해야 하는 것처럼 보인다. 그러나 실제로는 combine input records가 더 큰 경우가 많다. (항상 많은지는 모르겠음) 왜 그럴까?

천신만고 끝에 찾은 힌트 하나

After the shuffle each reducer will have one input for each map. The reducer needs to merge these inputs in order to begin processing. It is not efficient to merge too many inputs simultaneously. Thus, if the number of inputs exceeds a certain value, the data will be merged and rewritten to disk before being given to the reducer. During this merge the combiner will be applied in an attempt to reduce the size of the input data.
from Oreilly

나는 지금까지 combiner는 mapper에서만 동작하는지 알았는데, 실제로는 reducer에서 data를 shuffle하는 단계 (정확히는 merge)에서도 combiner가 동작을 한다!

좀 더 자세히 설명을 하면,

image
from Hadoop Definite Guide 2nd edition, Ch6. How MapReduce Works

  1. map의 output을 memory에 쓰다가, 할당된 memory buffer (기본은 100 MB)의 일정 비율 (기본은 80%)을 넘어서면, 현재 memory에 저장된 map의 output을 mapper node의 local file로 쓴다. (이 과정을 spill이라고 함)
  2. 실제 file로 쓰기 전에, 해당 data가 전송될 reducer 단위로 partitioning을 하고, 각각의 partition에 대해 백그라운드 thread가 in-memory sorting을 수행한 다음, sort의 output에 대해 combine 함수가 실행된다.
  3. memory buffer가 spill threashold에 다다르면, 새로운 spill file이 생성되며, 실제 map task가 수행되는 동안, 이러한 spill file이 여러 개 생성된다. map task 가 끝나기 전에, 여러 개의 spill file들을 하나의 파일로 합치게 되는데, 이렇게 하나의 파일로 merge를 하는 단계에서도 combine 함수가 실행된다.
  4. mapper의 결과가 reducer로 copy된 다음, reducer에서 여러 mapper에서 온 결과들을 sort하는 과정을 거치는데, 이 때 각각의 merge 단계에서도 combine 함수가 실행된다.

요약하면, mapper 뿐만이 아니라, reducer 단계에서도 combine 함수가 적용될 수 있으므로, Combine input records의 수가 Map output records의 수 보다 클 수도 있다.

※ 여기서 개발자들이 combiner를 사용할 때 유의해야 할 점은, combiner의 input으로 항상 map의 output이 들어온다 라고 가정을 하면 안 된다는 점이다. 위 설명에서도 알 수 있듯이, combiner의 output이 다시 combiner의 input으로 들어올 수 있다. 그러므로, combiner의 input은 map의 output format 뿐만 아니라, combiner의 output format도 적절히 처리할 수 있어야 한다.

Hadoop World 2011에서 발표되었던 HDFS Federation 요약

video를 보고 이해한 거 + 대략적인 소스 분석을 토대로 한 거라 틀린 부분이 있을 수도 있음

한 마디로 요약하면? 하나의 cluster에서 여러 namespace (namenode)를 쓸 수 있도록 한 것! 왜 이런 걸 했는지, 어떻게 했는지, 그리고 앞으로는 어떻게 발전할 것인지에 대해 소개함.

  1. 현재 HDFS Architecture에 대한 요약 : 자세한 내용은 여기 참조
    1. Namespace와 Block Storage의 조합
    2. Namespace : directory, file, block으로 구성 / file 및 directory에 대한 create, delete, modify, list를 지원
    3. Block Storage : Datanode membership 관리 / block에 대한 create, delete, modify, get 을 지원 / 복제 계수 관리
    4. 현재는 cluster 전체에 하나의 Namespace Volume이 존재, namespace는 하나의 namenode에 올라감
      1. 전체 namespace는 namenode의 메모리에 올라가며, block file들은 datanode의 local file system에 저장됨
  2. 무엇이 문제인가?
    1. Scalability : datanode만 추가하면, 용량은 쉽게 증가하지만, namespace자체는 그렇지 않음
      1. 메모리의 한계로, namespace의 file와 directory 개수에 제한이 있음
        1. 64 GB 메모리에 약 2억 5천 만개의 파일과 block들이 저장
      2. 여전히 매우 큰 클러스터라고 할 수 있지만.. (Facebook은 약 70 PB를 저장함)
    2. Performance : namenode가 하나이므로, 당연히 throughput에 제약이 있음 (한 대가 아무리 좋아 봐야 몰리면.. 뭐..)
      1. 120,000 read ops / sec & 6,000 write ops / sec (write를 할 때는 file로 edit log를 떨궈야 하기 때문에 느림)
      2. 코드 최적화를 통해 20,000 write ops / sec 정도까지는 쉽게 올릴 수 있음
      3. 아무리 최적화를 하더라도, 한 대로 서비스를 하면 bottleneck이 생길 수 밖에 없음
    3. Poor Isolation : 모든 사람/application이 하나의 namespace를 쓰는 문제
      1. 최적화가 덜된 실험적인 job과 상용 job이 하나의 namespace를 사용하는 것은 문제
      2. production cluster에서 하나의 테스트 job이 파일을 어마어마하게 생산하고 지우는 무식한 작업을 한다면, 다른 중요한 job들이 실행되는데도 영향을 끼침
    4. Tight Coupling : 따지고 보면, Namespace와 Block 관리는 서로 다른 분리된 서비스들임. 그런데 엮여 있다 보니 scaling하기 복잡함.
      1. 현재는 모두 namenode 관련 코드로 묶여있음
      2. 두 가지를 분리하여, block 관리도 단순화하고, namespace도 단순화하자!
      3. 궁극적으로 Block Storage 서비스를 일반화 하자.
        1. 이 경우, namespace는 block storage를 사용하는 하나의 어플리케이션이 되는 것
        2. 그 외 HBase 등도 block storage 서비스를 사용할 수 있음
  3. 사실 scaling 보다는 isolation이 더 문제임
    1. namenode에 64 GB 메모리만 꼽으면, 대부분의 cluster에서는 큰 문제가 안됨 (HDFS 특성 상 2억 개를 넘는 file을 저장할 일이 있을까?)
    2. 하지만, 다른 종류의 application들에게 독립적인 namespace를 제공해주는 것은  cluster의 크기가 작더라도 필요함
  4. HDFS Federation
    1. 하나의 cluster에 여러 개의 독립된 namenode들과 namespace volume이 존재
    2. Block Storage는 일반적인 storage service 처럼 사용됨
      1. 1423 번 block에 xxx~~라는 내용을 저장하고, 이 block을 세 개 정도 안전하게 복사해주세요!
      2. 가장 가까이 있는 서버에서 2698 번 block을 읽고 싶어요!
      3. 이런 느낌?
    3. Block Pool
      1. 하나의 namespace는 하나의 block pool을 가짐
      2. 이전에는 datanode에게 1234 번 block에 저장해주세요~ 라고 요청했다면, 이제는 9876번 block pool (namespace 별로 unique하게 생성됨)에 1234번 block에 저장해주세요~ 라고 변경됨
        1. 즉, namenode와 datanode 사이에 block pool이라는 layer가 하나 더 생긴 것
      3. 기존의 block은 block id (long)로 구분되지만, block pool에서 사용되는 ExtendedBlock은 pool id (string)와 block id (long)로 구분됨
        1. pool id는 "BP-" + rand + "-"+ ip + "-" + System.currentTimeMillis(); 와 같이 생성되며 (NNStorage.java의 newBlockPoolID() 참조), 하나의 namespace가 고유의 block pool id를 가짐
      4. datanode들은 모든 namenode의 namespace에 소속되며, (몇 개의 datanode가 하나의 namenode에 종속되는 개념이 아님) 모든 namespace의 block들을 독립적으로 저장.
        1. 이전 :  /dfs/data/current/blk_-291620265307745070 와 같이 저장
        2. 0.23이후 : /dfs/current/block_pool_id/current/blk_-291620265307745070 와 같이 저장 (수정필요)
  5. 좋은 점은?
    1. 분산된 namespace : 여러 개의 namenode들로 분산됨
      1. 독립된 master (namenode)구조이므로 단순하고 robust함
      2. namenode code는 거의 고치지 않았으므로, 이전의 namenode 안정성을 그대로 유지
    2. Block Pool이 하나의 일반적인 storage service로 제공됨
      1. 각각의 namespace volume은 독립적임 (하나의 namenode가 죽어도, 다른 namenode에는 영향이 없음)
      2. 새로운 innovation와 빠른 발전을 가져올 것임
        1. 예컨대, 다양한 garbarge collection scheme이 적용된 tmp storage 라던가
        2. 좀 더 효율적인 distributed cache 라던가
        3. small object storage 등등..
  6. 좀 더 상세한 사항
    1. Namenode code는 거의 안 고치고, 대부분 datanode code와 config, tool code들이 수정됨
    2. core 개발은 4개월 만에 끝남
    3. namespace와 block management는 여전히 namenode에 남아 있음
      1. 미래에는 block management 부분은 namenode 밖으로 나갈 수도 있음
    4. 현존하는 cluster 배포에도 큰 영향이 없도록, 예전 설정으로도 동작이 가능함
    5. Cluster Web UI가 multi namespace를 지원할 수 있도록 수정됨
    6. 각종 tool들이 수정됨
      1. datanode들이 여러 namespace에서 decommission이 가능하도록 변경
      2. balancer가 여러 namespace에서 동작하도록 변경
        1. datanode들의 저장용량 뿐만 아니라, block pool storage에서도 balancing 이 가능
    7. federated cluster에서 namenode를 추가/삭제할 때, 전체 cluster 재시작 없이 가능
    8. 하나의 설정 파일로 namenode와 datanode 모두에 적용
  7. Namespace 관리
    1. /data/test.txt 파일은 어느 namespace를 봐야 할까?
    2. global namespace는 존재하지 않음
    3. client에 mount table이 존재함

      1. e.g.) /data/ –> NS1
        /project/ –> NS2
        /home/ –> NS3
        위와 같은 mount table이 설정 파일로 존재
  8. Next Step
    1. namespace와 block management layer의 완전한 분리
    2. 좀 더 나은 scalability를 위해 namespace에서 부분적인 내용 (e.g. 자주 access되는 file)만 memory에 올리고 나머지는 파일로 저장
    3. 실제 data copy (datanode들 사이의 block copy)없이 하나의 namespace를 다른 namespace로 이동

좀 더 많은 사항이 궁금하신 분들을 위해.. 아래의 HDFS 상위 설계서 일독을 추천합니다. 미소

https://issues.apache.org/jira/secure/attachment/12453067/high-level-design.pdf

의외로 자료가 별로 없어서.. 직접 소스코드와 로그를 뒤지면서 정리를 해 보았다.

출처 : Hadoop: The Definitive Guide, Second Edition

일단 책에 나와 있는 내용은 저 정도이고, 실제 소스코드를 찾아보면, 책에는 생략된 block management에 대한 내용도 있다. (사실 요 부분이 궁금해서..)
테스트 환경은 hadoop 0.20.5 / pseudo-distribute 모드이며, protocol 자체는 최근에 릴리즈된 hadoop 1.0 버전과 크게 차이는 없을 것이라고 생각된다. 아래는 local에 있는 102 MB (정확히는 106,168,320 byte) 샘플 파일을 hdfs에 upload하는 과정에서 발생한 log (debug log 포함)들을 모아서 정리한 것이다.

node

time

message

comment

Client

21:50:56

hdfs.DFSClient: /test.txt: masked=rwxr-xr-x

Client hdfs://test.txt 라는 파일을 생성하고자 함

Client

21:50:56

hdfs.DFSClient: Allocating new block

Client NameNode에게 block 요청

NameNode

21:50:56,717

*DIR* NameNode.create: file /test.txt for DFSClient_439896788 at 127.0.0.1

Client file을 요청하면, NameNode namespace (메모리)에서 빈 file entry 생성

NameNode

21:50:56,717

DIR* NameSystem.startFile: src=/test.txt, holder=DFSClient_439896788, ClientMachine=127.0.0.1, replication=1, overwrite=false, append=false

FSNameSystem.startFileInternal()

NameNode

21:50:56,718

DIR* FSDirectory.addFile: /test.txt is added to the file system

NameNode

21:50:56,718

DIR* NameSystem.startFile: add /test.txt to namespace for DFSClient_439896788

NameNode

21:50:56,724

ugi=chaehyun    ip=/127.0.0.1   cmd=create  src=/test.txt  dst=null    perm=chaehyun:supergroup:rw-r--r--

FSEditLog.logSync()가 완료 된 뒤 출력된 메시지.
메모리에 추가된 file entry가 정상적으로 EditLog (file)로 기록됨

NameNode

21:50:56,729

*BLOCK* NameNode.addBlock: file /test.txt for DFSClient_439896788

/test.txt 파일을 위한 block을 요청

NameNode

21:50:56,729

BLOCK*NameSystem.getAdditionalBlock: file /test.txt for DFSClient_439896788

해당 file의 내용을 기록할 block 이 block을 저장할 machine list를 반환
이 시점에서 block id와 DataNode list가 결정되며, Client는 DataNode list 중 첫 번째 machine에게 data를 전송

NameNode

21:50:56,729

DIR* FSDirectory.addFile: /test.txt with blk_-1208142459293778377_1012block is added to the in-memory file system

메모리에 저장된 file entry에 할당받은 block 정보들을 추가

NameNode

21:50:56,729

BLOCK* NameSystem.allocateBlock: /test.txt. blk_-1208142459293778377_1012

FSNameSystem.allocateBlock()

Client

21:50:56

hdfs.DFSClient: pipeline = 127.0.0.1:50010

 

Client

21:50:56

hdfs.DFSClient: Connecting to 127.0.0.1:50010

Client DataNode로 접속

Client

21:50:56

hdfs.DFSClient: Send buf size 131072

Client 해당 DataNode data를 packet 단위로 쪼개서 전송 시작

DataNode

21:50:56,750

Receiving block blk_-1208142459293778377_1012 src: /127.0.0.1:6047 dest: /127.0.0.1:50010

DataNode Client로 부터 첫 번째 block을 받기 시작

DataNode

21:50:57,606

PacketResponder 0 for block blk_-1208142459293778377_1012Closing down.

DataNode Client로 부터 첫 번째 block 받기를 끝냄

Nadenode

21:50:57,609

*BLOCK* NameNode.blockReceived: from 127.0.0.1:50010 1 blocks.

DataNode NameNode에게 보고.
"형님 block 잘 받았습니다!"

Nadenode

21:50:57,609

BLOCK* NameSystem.blockReceived: blk_-1208142459293778377_1012 is received from 127.0.0.1:50010

해당 block들을 정상적으로 수신했음을 알림. 
"오~ 니가 이 block들을 받았구나~"

Nadenode

21:50:57,609

BLOCK* NameSystem.addStoredBlock: blockMap updated: 127.0.0.1:50010 is added to blk_-1208142459293778377_1012 size 67108864

NameNode block들을 실제로 저장한 DataNode들의 정보를 저장
"나중에 딴 소리 못 하게 적어놔야지.."

Client

21:50:57

Allocating new block

Client가 두 번째 block을 요청
"아직 데이터가 많이 남았습니다! block 하나 더 주세요!"

Nadenode

21:50:57,610

*BLOCK* NameNode.addBlock: file /test.txt for DFSClient_439896788

Nadenode

21:50:57,610

BLOCK* NameSystem.getAdditionalBlock: file /test.txt for DFSClient_439896788

NameNode에서 추가 block 할당

Nadenode

21:50:57,610

DIR* FSDirectory.addFile: /test.txt with blk_5548041259556473375_1012 block is added to the in-memory file system

Nadenode

21:50:57,610

BLOCK* NameSystem.allocateBlock: /test.txt. blk_5548041259556473375_1012

Client

21:50:57

pipeline = 127.0.0.1:50010


Client

21:50:57

Connecting to 127.0.0.1:50010

Client DataNode packet 단위로 쪼개진 data 전송 시작

DataNode

21:50:57,611

Receiving block blk_5548041259556473375_1012 src: /127.0.0.1:6050 dest: /127.0.0.1:50010

DataNode Client로 부터 두 번째block을 받기 시작

DataNode

21:50:58,034

PacketResponder 0 for block blk_5548041259556473375_1012 Closing down.

DataNode Client로 부터 두 번 째 block 받기를 끝냄

Nadenode

21:50:58,037

*BLOCK* NameNode.blockReceived: from 127.0.0.1:50010 1 blocks.

DataNode NameNode에게 보고

Nadenode

21:50:58,037

BLOCK* NameSystem.blockReceived: blk_5548041259556473375_1012 is received from 127.0.0.1:50010

해당 block들을 정상적으로 수신했음을 알림

Nadenode

21:50:58,037

BLOCK* NameSystem.addStoredBlock: blockMap updated: 127.0.0.1:50010 is added to blk_5548041259556473375_1012 size 39059456

NameNode block들을 실제로 저장한 DataNode들을 저장

Nadenode

21:50:58,038

*DIR* NameNode.complete: /test.txt for DFSClient_439896788

Client가 파일을 다 썼음

Nadenode

21:50:58,038

DIR* NameSystem.completeFile: /test.txt for DFSClient_439896788

file을 구성하는 모든 block들이 최소 replication 이상 복사 되었음을 확인

Nadenode

21:50:58,038

Removing lease on  file /test.txt from Client DFSClient_439896788

Nadenode

21:50:58,038

DIR* FSDirectory.closeFile: /test.txt with 2 blocks is persisted to the file system

Nadenode

21:50:58,038

DIR* NameSystem.completeFile: file /test.txt is closed by DFSClient_439896788

파일 쓰기 완료

  
복잡해 보이니 간단히 요약해보자.

  1. Client에서 파일을 쓰기 위해 NameNode에게 file 생성을 요청을 하면, NameNode는 먼저 메모리에 해당 path에 이미 파일이 존재하는지, Client가 적절한 권한을 가지고 있는지 확인한 다음, 문제가 없으면, namespace 상에서 file entry를 생성한다.
  2. 그런 다음, 실제 file을 저장할 block을 할당하고, Client의 위치와 DataNode들의 저장 상황, 위치 등을 고려하려, 해당 block을 저장할 DataNode 들을 선정한다.
  3. Client는 NameNode로 부터 받은 block 정보를 이용하여, 첫 번째 DataNode에 data를 전송하고, 전송이 정상적으로 완료되면, NameNode에게 다음 block을 요청한다.
  4. NameNode가 다시 block을 할당하면, Client가 file의 다음 내용을 DataNode에게 전송하고, 이 과정을 반복한다.
  5. Client에 있는 file의 모든 data가 DataNode들에게 정상적으로 전송이 끝나고, NameNode가 해당 파일에 속하는 모든 block들이 최소한의 복제 계수만큼 복사가 완료되었다고 판단하면 file write가 종료된다.

그 외 내가 궁금해서 찾아본 내 맘대로 FAQ)

  • Q) NameNode는 block을 어떻게 관리하나?
    A) block은 long으로 구분되며, 이론 상 2^64 개의 block을 생성할 수 있다. block은 계속 생성되고 소멸되므로, block id (long)을 관리하는 일이 중요한데, NameNode에서는 현재 굉장히 simple하게-_- 관리를 한다. 랜덤하게 block id값을 고른 다음, 아직 사용하지 않은 숫자가 나올 때 까지 계속한다. (허무할 정도로 간단하게..)
    참고로, 사용 중인 block들에 대한 정보는 BlocksMap 이라는 class에서 관리하며, 내부에서는 LightWeightGSet 이라는 자체 set class를 만들어서 저장한다.

  • Q) block을 저장할 DataNode들은 어떻게 선발하나요?
    A) hadoop 0.20.5 버전에서는 ReplicationTargetChooser 라는 class가 해당 역할을 수행한다. 현재 버전에서의 복사 전략은 Client가 DataNode일 경우에는 먼저 해당 DataNode가 첫 번째 노드로 선정된다. (이럴 경우, 첫 번째 복사는 network을 타지 않기 때문에 빠르다) 만약 Client가 DataNode가 아니면, 그냥 랜덤하게 고른다.
    그리고 두 번째 node는 첫 번째 DataNode와 다른 rack에 위치한 DataNode들을 고르고, 세 번째 DataNode는 첫 번째 node와 같은 rack에 있는 node를 고른다. 그리고 네 번째 부터는 그냥 랜덤하게 고른다.
    이렇게 함으로써, data 전송을 위한 network traffic을 줄이고, data 안정성을 최대한으로 높일 수 있다. (복제계수가 2이상이면, 최소 하나 이상의 block은 다른 rack에 저장되므로, 한 rack 전체가 날아가더라도, file은 살아 있을 것이다)
    그리고 ReplicationTargetChooser에게 DataNode를 요청할 때는 exclude list를 함께 줘서, 장애가 발생한 DataNode나, 이미 해당 block을 저장하고 있는 DataNode, decommission 중인 DataNode들은 제외되도록 설계되어 있다.


─ tag  hadoop, hdfs, 하둡

Hadoop World 2011에서 Arun 아저씨가 발표한 Apache Hadoop 0.23 자료 정리

발표 video : http://www.cloudera.com/videos/hadoop-world-2011-presentation-video-apache-hadoop-0.23

  1. Hortonworks에서 일하는 Arun 아저씨는 Apache Hadoop PMC (Project Management Committe)의 Chair임. 그리고 hadoop 0.23의 release manager
  2. hadoop에 대한 간단한 역사
    • nutch project에서 시작. 2006년 hadoop project를 시작한 Doug Cutting 아저씨를 yahoo에서 채용
    • 초기 버전은 매달 릴리즈 (0.1, 0.2…)
    • 0.15~0.20까지는 분기마다 릴리즈
    • hadoop 0.20 버전은 여전히 현재의 stable한 버전임
    • 2011년 5월에 security를 추가한 0.20.203 버전이 릴리즈 (내가 한 일을 남들이 모르게 하라..)
    • 2011년 10월에 hbase를 위한 몇 가지 기능을 추가하여 0.20.205 버전이 릴리즈
  3. hadoop 0.23 버전은 30개월 만에 첫 stable 버전 release가 될 예정
  4. 간만에 major feature 추가
    • HDFS Federation : scaling 문제를 해결하기 위해 multi name node 도입, namespace관리와 block 관리를 분리
    • 차세대 map reduce인 YARN 도입
    • 전반적으로 두 배 이상 성능 향상 / local read일 경우, hdfs protocol을 타지 않고 빠르게 읽도록 수정 (for Hbase) / shuffle의 성능이 30% 이상 향상
    • HDFS의 name node의 고가용성 확보 (name node가 죽어도 어떻게든 수습하자)
    • 기타 HBase를 위한 pipline 개선 / fulle mavenization / name node의 Edit log를 좀 더 단순하고 튼튼하게 수정
  5. 배포 목표 
    • 궁극적인 목표는 하나의 data center의 컴퓨터들(약 2.5~3만대)을 하나의 cluster로 묶는 것
    • 0.23에서는 6천대의 node가 하나의 cluster로 동작하는 것이 목표 (현재는 4천대 정도 인듯?)
    • 현재 (0.20) 4만 concurrent task / cluster –> 10만 개 이상의 concurrent task / cluster
    • 현재 (0.20) 3~4천 개의 concurrent job / cluster -> 만 개의 concurrent job / cluster
    • 잡담하나 : Yahoo는 5만대 이상의 machine들을 less then 1 full time employee가 관리한다고.. 이것이 hadoop의 cool한 점이라고 자랑함
  6. 이제 그럼 무엇을 더 해야 하는가?
    • 테스트를 많이 한다. 최소한 마지막 버전만큼의 성능은 나오도록
    • HBase, Pig, Hive, Oozie와의 intgration test도 한다
  7. 왜 Test가 어렵나?
    • 단순한 api가 아님. map reduce는 굉장히 넓은 범위의 api
    • hadoop streaming까지 들어가면, 이건 거의 unix 수준 (이라고 들었는데 제대로 들은 건가?)
    • 매일밤 MapReduce 하나를 위해, 거의 천 여 개의 functional test가 돌아감
    • 그리고 Pig/Hive를 위한 수 백 개의 테스트도 돌아감
    • Scale test는 기계가 많이 없어서.. 200~400대로 2000대를 simulation 하여 테스트한다
    • 얼마나 오래 가는지도 테스트하고, stress test도 열심히 한다
  8. Benchmarks
    • HDFS와 Map Reduce pipeline의 모든 부분들을 벤치마크 (HDFS에서 읽고 쓰는 throughput, name node의 성능 등)
    • GridMixv3 (hadoop cluster들을 위한 benchmark tool) : Macro benchmark로 이해하면 ok. 만약 shuffle에서 30 % 성능이 좋아졌다면, 실제 production cluster에서 성능은 얼마나 좋아졌을까? 수 천 개의 job을 실행하고 benchmark함
  9. 배포
    • 내부 alpha 테스트 : 2011년 11월 시작. 500~800대 정로 테스트
    • alpha 버전 : 2012년 1월 예정. 2천 대의 노드로 구성된 클러스터를 포함하여 만 대 이상의 노드로 테스트
    • beta 버전 : 2012년 1/4분기 늦게. 사실 베타라고 얘기하기도 애매함. 4천 개 이상의 노드로 구성된 클러스터를 포함하여 총 2만 개 이상의 노드로 테스트. 야후에서는 한 달에 2백 만 개의 job을 돌릴 예정. 몇 달 동안 테스트하면, 천 만 개 이상의 job이 테스트됨. 이 정도면 뭐..
    • beta가 끝나면 production. stable 버전이 될 것. 2012년 2/4 분기


'개발관련팁 > Hadoop' 카테고리의 다른 글

HDFS Federation  (0) 2012.01.12
hdfs의 file write protocol에 대한 이해  (0) 2012.01.05
Apache Hadoop 0.23 이야기  (0) 2011.12.21
하둡에서 InputSplit과 HDFS 블록 사이의 관계  (3) 2011.08.08
Hadoop의 InputSplit  (0) 2011.07.10
hadoop에서 map task 숫자 조절하기  (2) 2011.06.02
─ tag  hadoop

예를 들어 hadoop의 TextInputFormat 을 보자. 이 포맷은 하둡의 default input format인데, 파일에서 text들을 라인 단위로 읽어서 map task에게 제공해주는 역할을 한다. (참고로 key는 파일 내에서 각 라인의 시작 지점까지의 바이트 오프셋 값이다.)

여기서 당연히 한 가지 의문이 생기는데, 기본적으로 InputSplit은 HDFS의 블록으로 쪼개진다. (특별히 따로 InputSplit을 정의하지 않았다면) 그럼 HDFS block boundary가 TextInputFormat의 line boundary와 정확히 일치하지 않을 텐데, hadoop에서는 이를 어떻게 처리하고 있을까?

Hadoop: The Definite Guide 책을 보면, 아래와 같은 그림이 나온다.

그리고 설명하기를, block boundary와 input split의 논리적인 단위 (여기서는 line 단위)가 일치하지 않더라도, input split을 잘 계산하여 처리한다고 나와 있다. 이 설명만 놓고 보면, 마치 InputFormat에서 InputSplit을 계산할 때, 이런 논리적인 단위가 나눠지는 것처럼 보인다.

하지만 실제로 InputSplit을 구현한 FileSplit을 코드를 보면, 아래와 같은 정보만을 가지고 있다.

  • private Path file;   // 어떤 file에서 나온 split인가
  • private long start; // file 내에서 해당 input split의 시작 위치
  • private long length;  // input split의 크기
  • private String[] hosts; //  해당 input split을 가지고 있는 data node들의 host name

그리고 이런 FileSplit을 계산해 내는 FileInputFormat 클래스 소스를 살펴보면, 논리적인 단위를 계산하는 부분은 없고, 단지 file을 block boundary 단위로 나누어, FileSplit을 계산한다. 즉, FileSplit이 가지고 있는 정보는, 논리적인 단위가 아닌, 단순히 물리적인 block의 경계에 대한 정보만 가지고 있다. 그렇다면 실제로 block 단위가 아닌, line 단위로 input split을 재계산 해 주는 녀석은 누구일까?

비밀은 바로 RecordReader에 숨어 있다. InputFormat interface는 getSplits() 메소드를 통해 split을 계산해내고, getRecordReader() 메소드를 통해 해당 split을 읽을 수 있는 RecordReader를 반환한다.

그리고 각 MapTask가 실행될 때, map() 메소드를 호출하기 전에 RecordReader의 initialize() 메소드를 한 번 호출한다. 그리고 TextInputFormat이 사용하는 LineRecordReader의 initilize() 메소드를 살펴보면, map task로 전달된 input split이 파일의 첫 번째 block이 아닐 경우, 무조건 첫 번째 line은 skip하도록 구현되어 있다. (start position에 -1을 한 다음, 한 줄을 건너뜀) 이제 저 위 그림이 이해가 될 것이다.

정리를 하면, file을 처리하는 InputSplit은 단순히 물리적인 block 단위 (기본적으로는 hdfs block 단위)로 쪼개져서 map task에게 전달된다. 그리고 각 map task는 이렇게 쪼개진 input split으로부터, 실제로 map() 에서 처리해야 하는 논리적인 record 단위를 계산하고, 이를 바탕으로 InputSplit의 boundary를 다시 계산하여 처리를 하게 되는 것이다.

─ tag  hadoop, inputsplit, 하둡

hadoop은 기본적으로 input split에 의해 계산된 각 split에 대해 하나의 map task를 생성한다. (여담이지만, 실제 map task의 개수는 mapred.map.tasks의 값이나 setNumMapTasks() 에 의해 결정되지 않는다. 자세한 이야기는 여기 참조)

InputSplit은 딱 두 개의 method를 가진 abstract class로 정의되어 있고, 각 method는 다음과 같다.

  1. public abstract long getLength();          // split의 size를 반환
  2. public abstract String[] getLocations(); //split의 data가 저장된 node들을 반환

실제 split을 구현한 class로 FileSplit class가 있다. (DB에 있는 data 역시 input split으로 쓰일 수 있는 모양이지만, 실제로 hadoop에 구현된 class는 없는 듯..)

FileSplit은 아래와 같은 member variable을 가지는데,,

  • private Path file          //파일 이름
  • private long start        // 파일에서 처리할 바이트의 첫 시작 위치
  • private long length      // 처리할 바이트 수
  • private String[] hosts  // 실제 해당 block을 가지고 있는 host들의 list

이러한 InputSplit들은 mapreduce job을 실행하는 단계에서 보면 4. submit job 단계에서 계산이 된다. 즉, JobClient가 JobTracker에게 작업을 제출할 때 계산된다.

image

Firgure 6-1. How Hadoop runs a MapReduce job. 출처 : Hadoop The definite Guide

 

그런데 막상 코드를 보면, 위 그림에서처럼 딱 무 자르듯이 나눠져 있는 건 아니고, 3번 단계에서 job에 필요한 리소스 (jar 파일들, 설정 파일 등등) 들을 hdfs의 특정 경로에 복사를 하는데, 이 특정 경로에 계산된 inputsplit 정보들도 같이 복사가 된다.

그리고 6. retrieve input splits 단계에서는 3~4번 단계에서 복사된 input split가 담긴 파일들을 가져와서 map task에서 넘겨주는 구조임.

input split을 계산하는 단계가 4번인지 6번인지 헷갈려서 조사해봄.

─ tag  hadoop, 하둡

* 정보 공유 차원에서 오늘 한 삽질을 기록으로 남김

  1. 상황
    • map/reduce 프레임웍이 필요한 것은 아니고, 단순하지만 처리하는데 오래 걸리는 작업들을 여러 컴퓨터에 분산하여 처리하고자 함.
    • reduce task 개수를 0으로 주고 mapper only로 설정하여 작업을 시작.
    • 문제 : 현재 hadoop의 scheduler는 preemptive 하지 않기 때문에, 한 번 map task가 시작되면, priority가 낮더라도 해당 map task가 끝날 때 까지 계속 진행됨. 덕분에 나의 단순하고 오래 걸리는 작업들이 한 번 hadoop에서 돌기 시작하면, 다른 긴급한 작업들이 처리되지 못하는 상황이 발생!
    • 시도된 해결책
      • 나의 job에서 동시에 실행되는 map task 개수를 제한하여, 다른 긴급한 작업들이 처리될 수 있도록 함
  2. 삽질.. 삽질..
    • 첫 번째 시도 : mapred.jobtracker.maxtasks.per.job=10 으로 설정
      • 하나의 job 당 최대 task 수를 10으로 설정했으니, 한 번에 실행되는 map task의 수도 제한되지 않을까? 라고 생각했으나,, 실제로는 적용 안됨. ㅜ.ㅜ
    • 두 번 째 시도 : mapred.tasktracker.map.tasks.maximum=1 로 설정
      • 이 설정의 의미는 하나의 task tracker에 의해 동시에 실행되는 map task의 개수
      • 저렇게 설정하면, 하나의 task tracker에 의해서 하나의 map task만 실행되어야 한다. 하지만 실제로는 적용이 안됨.
      • 이 부분에 대해서는 원인을 찾았는데, Hadoop The Definitive Guide라는 책에 보면 아래와 같이 쓰여 있다.
      • Be aware that some properties have no effect when set in the client configuration. For example, if in your job submission you set mapred.tasktracker.map.tasks.maximum with the expectation that it would change the number of task slots for the tasktrackers running your job then you would be disappointed, since this property only is only honored if set in the tasktracker’s mapred-site.html file.

      • 해석하면, mapred.tasktracker.~ 로 실행되는 설정들은 tasktracker daemon에 의해서만 적용되며, client에서 설정한 값은 무시 된다는 말씀.
    • 세 번째 시도 : mapred.map.tasks=10 으로 설정!
      • mapred.map.tasks 설정은 하나의 job 당 map task 의 개수이므로, map task의 개수가 적으면 당연히 동시에 실행되는 map task의 개수도 제한 될 것이라고 생각했지만!! 여전히 적용 안됨.
      • 원인을 찾아본 결과, mapred.map.tasks는 map task의 개수는 늘릴 수 있지만, 하둡이 결정한 map task의 개수 이하로는 줄일 수 없다고 함. 출처 : http://wiki.apache.org/hadoop/HowManyMapsAndReduces
      • 참고로 하둡은 하나의 input split 당 하나의 map task를 생성함. input split이란, 각 input file들을 구성하고 있는 하나 하나의 data block을 뜻함.
    • 발상의 전환 : 굳이 한 번에 실행되는 map task의 개수를 줄일 필요가 있을까?
      • 문제가 되는 상황은 나의 job이 모든 map task slot을 점유하고, 다른 긴급한 job들이 나의 slot을 뺏을 수 없어서 생기는 문제.
      • 그렇다면, 하나의 map task 가 빨리 빨리 종료되도록 단위를 줄이면, 그 사이 다른 priority 가 높은 job들이 종료된 map slot을 차지하여, map task을 실행시킬 수 있게 되어, priority가 역전되는 문제를 회피할 수 있음.
      • map task를 얼마나 늘려야 할까? 심플하게 mapred.map.tasks=100000000 정도로 설정하면, 하나의 map task가 처리하는 단위가 작아지고, 굉장히 빠른 시간에 종료되므로, preemptive 문제는 해결할 수 있지만, 이 경우, input split이 너무 작아질 수 있기 때문에, hadoop에서 배보다 배꼽이 더 큰 상황이 발생할 수 있다.
      • 이때 구세주처럼 등장한 설정이 있으니 바로 mapred.min.split.size ! 하나의 input split의 최소 단위를 설정할 수 있는데, 이 설정을 이용하면, map task가 지나치게 많아지는 것을 방지할 수 있다.
      • 최종 해결책
        • mapred.min.split.size=4194304  (하나의 input split은 최소 4MB 단위로 처리해라)
        • mapred.map.tasks=1000000   (최대한 map task를 잘게 쪼개서, 다른 작업에 방해가 되지 않도록 하자)
        • 실제로 위와 같이 설정하면, 전체 map task가 백 만 개 이하로 적절하게 쪼개져서 동작함. (몇 개로 쪼개지는 지는 전체 input file의 size에 따라 다름)
─ tag  hadoop, 하둡

요즘 업무 때문에 hadoop을 많이 사용하고 있다. 오늘의 삽질 항목은 hadoop에서 iterator를 재사용할 수 없을까? 하는 것이다. hadoop의 reducer는 동일한 key로 묶인 value들을 가져올 수 있도록, Iterable<?> type을 반환해 준다. 그런데 쓰다 보면, 이 reducer의 값들을 두 번 이상 읽고 싶을 때가 있다. 예를 들면, 전체 value들의 개수를 세어서 값들을 출력할 때 함께 찍어주고 싶다거나.. 기타 max, min 값을 구해서 같이 출력 하고 싶다거나 등 은근 필요한 경우가 많다.

처음에는 단순하게 Iterable<text> values 변수를 복사해 뒀다가, 다시 iterator를 호출해서 쓰는 것을 시도했다. 대충 돌려보니 컴파일 오류 안 나고 실행도 잘 되는 듯 했다! 올레를 외치며 대용량 데이터에 적용을 했는데.. 몇 시간 동안 열심히 작업을 하 더니 내 놓은 결과는,,  reducer output records : 0개  -_- 결과 값이 아무 것도 없는.. T_T

구글링을 해 보니, 구현상의 이유로, reducer의 iterator는 두 번 읽을 수 없다고 한다. (요기 참고) 생각해보면, key에 딸린 value들이 어마어마하게 많으면, 파일 블록 하나로 처리가 안될 테니, 여러 파일에서 나눠서 읽어야 할 테고, 여기서 iterator를 다시 호출 할 수 있게 하려면 시스템이 복잡해 질 수도 있겠다 싶었다. (그래도 좀 해주면 안 되려나)

하여간, 이를 극복하기 위해서는,

  1. value들을 모두 list에 따로 저장
  2. value들을 local file에 저장했다 다시 읽기

요 두 가지 방법 정도가 떠올랐다. 처음에 list에 저장하도록 해서 돌렸더니, 여기저기서 out of memory 오류를 뿌리며 죽었다. –_-; 은근 메모리에 다 못 올릴 cluster들이 많은가 보다.

그래서 local file에 저장해서 다시 읽을까 했는데, 작은 cluster에 대해서도 일일이 local file에 저장하고 읽으면 이것도 은근 속도에 bottleneck이 될 듯 하여.. n개 이하는 메모리의 list로 처리하고, n개가 넘어가면, 파일에 저장해서 다시 읽도록 구현해서 돌렸다. 그런데 이게 어디서 오류가 생긴건지 24시간이 지나도록 몇몇 reducer 작업들이 끝나지를 않는다. –_-; 아놔

일단 시간이 없어서, 결국 이번에는 iterator를 한 번 사용해서 결과를 찍고, 결과를 별도로 후처리를 하여 원하는 값을 얻었다. 뭔가 더 깔끔한 방법이 있을 듯 한데,, 방법이 없을까?

─ tag  hadoop, Java, 하둡
openclose

티스토리 툴바