데이터 엔지니어링

[논문리뷰 시작] Piranha: Optimizing Short Jobs in Hadoop

작년 2학기 [빅데이터프로그래밍] 수업에서 공부했던 논문을 정리해보려고 합니다.

수업에서 위의 Hadoop/Spark와 관련된 논문들 중 하나씩 맡아서 공부하고 발표하는 과제가 있었습니다.

저는 Piranha: Optimizing Short Jobs in Hadoop이라는 논문을 맡았는데, 빅데이터를 처리하기 위한 Hadoop에서 small job을 고려하는 이유와 방법이 궁금해서 선택했었습니다.

 

당시에는 이런 논문을 보는게 처음이기도 했고 CS 관련 지식도 부족해서 리딩하는데 상당히 많은 시간을 쏟았었습니다.

하지만 논문을 읽으면서 그 기술이 어떤 배경에서 나왔고, 중요한 개념들은 무엇이며, 어떻게 구현되는지 이해하는데 큰 도움이 되었습니다.

 

개인적으로 하둡 생태계에 관심이 많아서, 관련 논문들을 꾸준히 읽고 정리해볼 예정입니다.

아직 부족함이 많아 좋은 리뷰를 하지는 못하더라도 양해부탁드리며, 피드백은 언제나 환영합니다!!


논문 요약

현재 대다수의 회사들이 빅데이터의 분산처리를 위해 Large-scale MapReduce/Hadoop 환경을 구축하고 있다. 하지만 실제 workload에서 실행되는 job들을 살펴보면 80% 정도가 small job임 확인할 수 있다. 논문에서는 Hadoop 환경에서도 small job을 효율적으로 처리할 수 있도록 Piranha task, Rendezvous 등의 개념을 제시하고 있다.

Piranha의 주요 특징은 다음과 같다.

  1. check-pointing을 하지 않는 대신 실패 시 전체 job을 재시작한다.
  2. DAG 형태를 가지는 small job을 위해 간소화된 fault-tolerance 방법을 사용한다.
  3. 하둡에 의존하지 않는 self-coordinating을 수행한다.

Introduction

하이 레벨 프로그래밍 언어가 클라우드 컴퓨팅 분야에도 사용되면서 관련 시스템들을 더욱 쉽게 사용할 수 있게 되었다. 이에 따라 많은 사람들은 클러스터를 data warehouse 뿐만 아니라 쿼리를 집중적으로 처리하기 위한 용도로도 사용하기 시작했다. 여기에는 두 가지 특징이 있는데,

  1. large scale production job 뿐만 아니라 많은 'ad hoc query(short/small job)'를 함께 처리
  2. 이 쿼리를 처리하기 위한 데이터를 모두 모으면 하나의 DBMS로 감당하지 못할 만큼 양이 많음

Hadoop과 같은 프레임워크들은 scalability, reliability, throughput에 중점을 두고 있기 때문에, 위와 같은 빠른 응답이 필요한 short job은 좋은 성능을 내기가 어렵다.

Motivation

1. Workload Characterization

먼저 자주 언급되는 ad hoc query란 비정형적인 query이다. 일반적으로 여러 사용자가 자주 사용하는 업무용 query의 경우 잘 작성하여 프로그램의 형태로 저장해두겠지만, 단발적으로 특정 데이터가 필요한 경우가 생길 수 있다. 이 경우 사용자가 직접 SQL을 사용하여 DB에 쿼리를 요청한다. 이런 경우를 ad hoc query라고 하는 것이다.

 

Yahoo!의 Hadoop 클러스터에서 실행되는 job을 관찰하며 어떤 특성이 있는지 알아보자.

 

Job Sizes(runtime, mappers, reducers)

  1. [Job Sizes] 현재 클러스터에서 처리되는 job 중 80%는 실행시간이 10분 미만이며 평균 15개의 mapper를 가지고, 75%는 reducer를 단 1개만 가지고 있다.
  2. [Failure Rates] Yahoo! Hadoop job들의 실패 확률은 포아송 분포를 따른다고 볼 수 있다. 1번의 값들을 식에 대입해보면 small job의 실패 확률은 거의 0에 가깝다는 것을 확인할 수 있다.
  3. ★[Chained Jobs] 70% 이상의 job은 SQL과 같은 하이 레벨 언어로 작성되었으며, 이는 곧 서로 연결된 MapReduced jobs으로 이뤄진 directied graph로 만들어지게 된다. 보통 2~3 단계로 이뤄져있다.

Chained Jobs에 대해 논문에서 든 예시 쿼리는 "What are the top 10 web search queries for today?"이다.

이 쿼리는 두 개의 MapReduce job으로 만들어지는데 첫번째는 각 search query의 빈도를 계산하는 job이고, 두번째는 빈도로 정렬하여 top 10개를 뽑아내는 job이다. 두 job이 연속으로 수행되어 최종 결과가 나온다.

 

하지만 하둡은 연결된 MapReduce 형태를 지원하지 않기 때문에, chained job을 구성하게되면 두 가지 영향이 생긴다.

  1. 한 job의 output은 후속 job이 시작되기 전까지 HDFS에 저장되어야 한다.
  2. job을 수행하기 위한 데이터가 Hadoop pipeline에서 준비가 완료되면 Map 단계가 호출되어야 한다.

결국 위 그림과 같은 구조로 처리하게 되면, HDFS에 총 4번 접근해야 한다. 이런 추가적인 과정을 수행하는 동안 job의 latency는 증가하게 된다.

2. Opportunity

지금까지 살펴본 short job의 특징으로부터 우리는 다음과 같은 '기회'를 발견할 수 있다.

 

먼저 job이 거의 실패하지 않기 때문에 disk에 많이 접근해야 하는 check-pointing을 하지 않을 수 있다.

 

Check-pointing은 큰 데이터를 처리할 때 작업이 실패하면 다시 시작하기 위한 비용이 크므로, 중간에 결과를 저장하고 롤백함으로써 비용을 최소화 하기 위해 사용한다. 하지만 small job은 실패 확률도 낮을 뿐만 아니라 재시작에 필요한 비용 또한 비싸지 않다. 따라서 실패를 관리하는 방법으로 재시작을 사용하면 check-pointing으로 인한 오버헤드와 디스크 접근을 줄일 수 있고, Hadoop의 정교한 fault-tolerance나 JobTracker의 비싼 heartbeats protocol을 사용하지 않을 수 있다.

 

결과적으로 task들은 Hadoop에 의존하지 않고 Self-coordinate 할 수 있다. Job이 실패해도 별도의 과정 없이 스스로 복구할 수 있다는 의미이다.

Piranha

지금까지의 내용을 바탕으로 Hadoop의 middleware로 사용할 수 있는 Piranha 시스템을 제안한다.

특징은 short job에 최적화 되어있다는 것이다!

1. Design(개념)

1) 3 Types of Piranha Tasks

Piranha Job은 위 그림과 같은 구조를 가지며 3개의 task로 구성된다.

  1. Initial Tasks: 외부 파일 시스템으로부터 데이터를 읽어오고, 자식 노드로 결과를 전달한다. 부모 노드가 없다.
  2. Intermediate Tasks: 부모로부터 데이터를 받아오고, 자식 노드로 결과를 전달한다.
  3. Terminal Tasks: 부모로부터 데이터를 받아오고, 파일 시스템에 결과를 저장한다. 자식 노드가 없다.

또한 Piranha Job은 self organizing 된다고 말한다. 작업을 완수하기 위해 서로 협력한다. 따라서 외부 인프라의 영향을 최소화 할 수 있다. 가장 처음 데이터를 받아오는 initial tasks에 대한 실행 및 스케쥴링만 외부 인프라에 맡긴다. 이외의 과정은 Job 내에서 자동적으로 이루어진다.

2) Rendezvous between tasks

3) Data transfer between tasks

4) Fault-tolerance