[BigData] Spark 공부

Knowledge 2016. 10. 19. 16:32
spark.html

Basic of Spark

이하의 내용은 ‘Learning Spark’ 책의 3장까지 공부한 내용을 바탕으로 작성한 글입니다.
소스코드는 책의 것을 일부 인용했습니다.

소스는 모두 Python으로 작성됩니다.


About Spark

Spark는 분산 데이터 연산/처리 플랫폼이다.
기존에 Hadoop은 일괄 데이터 처리에 효율적이었으나 현재는 실시간으로 증가하는 빅데이터를 처리해야 하기에 Hadoop의 맵리듀스 프레임워크는 사용하기 힘들다.

맵리듀스 프레임워크는 실시간 데이터 처리 과정에서 많은 트래픽과 디스크 I/O를 요구할 수 있고, 맵->리듀스로 이어지는 2단계 구조로 데이터를 프로세싱하기 때문에 반복 작업이나 산발적으로 일어나는 데이터 연산/처리에는 부적합하다.


Spark는 맵리듀스 모델을 대화형 명령어 쿼리나 스트리밍 처리가 가능하도록 확장하였다.
Spark는 기본적으로 인메모리 기반으로 데이터를 연산하기 때문에 디스크 기반 연산을 수행하는 맵리듀스보다 훨씬 뛰어난 성능을 보이지만 이를 디스크에서 돌린다고 하더라도 맵리듀스보다는 뛰어난 성능을 보여 준다.


Spark는 Scala 기반으로 개발되었으며 Python, Scala, Java, SQL 등의 라이브러리를 내장해 지원한다.
다른 빅데이터 툴들과도 연계가 잘 되는데, Spark는 Hadoop 클러스터 위에서 실행할 때 특히 좋은 성능을 보인다.
그렇다고 Hadoop을 꼭 필요로 하는 것이 아니라 단순히 Hadoop API를 사용하는 저장 시스템을 지원할 뿐이다.



RDD 다루기

모든 Spark 애플리케이션은 클러스터에서 다양한 병렬 연산을 수행하는 드라이버 프로그램으로 구성된다.
드라이버 프로그램들은 연산 클러스터에 대한 연결을 나타내는 SparkContext 라는 객체를 통해 Spark에 접속하는데, Python이나 Scala로 셸을 통해 접속할 경우 자동적으로 sc라는 변수에 만들어지므로 따로 만들 필요는 없다. 단일 프로그램으로 Spark를 다룰 때는 다음과 같이 SparkContext를 만들어 줘야 한다.

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)

SparkContext를 이용하면 이것으로 RDD라는 것을 만들어낼 수 있다.
RDD는 Resilient Distributed Dataset의 약자로, 단순하게는 분산되어 존재하는 데이터 요소들의 모임이다.
RDD는 여러 개의 파티션으로 이루어져 있으며 개수는 사용자가 직접 지정이 가능하다. 파티션의 개수에 맞춰서 병렬 연산을 수행하기 때문에 많은 파티션으로 나눠져 있으면 빨라지는 것은 사실이지만, 너무 많은 파티션으로 나누는 것도 효율이 좋지 않을 수 있다. 이를 효율적으로 적용하는 것은 개발자의 몫이다.

이미지
(출처 : http://ourcstory.tistory.com/147)


위 이미지를 보면 코어가 3개인 PC에서 파티션을 4개 만들었을 때와 3개 만들었을 때 작업 처리에 걸리는 시간을 나타내고 있다.

Spark는 Hadoop API를 지원하기 때문에 HDFS, Local File System, Cloud Service 등에서 데이터를 받아와 RDD를 만들 수 있다.
또한 파일을 읽어서 RDD를 만들 수도 있으며 parallelize 함수에 직접 데이터를 입력하여 RDD로 변환하는 것도 가능하다.

1) parallelize 함수 사용

다음과 같이 sc의 parallelize 함수에 데이터를 인자로 넘겨 주면 그것으로 RDD를 만들 수 있다.

sc = SparkContext(conf = conf)
myRDD = sc.parallelize([1, 2, 3, 4])

2) 외부 데이터 사용

대표적으로 textFile 함수를 이용해 외부의 데이터를 받아와 RDD를 만들 수 있다.

sc = SparkContext(conf = conf)
myRDD = sc.textFile("Path_of_File")


RDD가 일단 한 번 만들어지면 이를 수정하는 것은 불가능하다.
RDD는 두 가지 타입의 연산을 지원하는데, 트랜스포메이션(Transformation)액션(Action)이다.
트랜스포메이션은 연산 결과 값으로 새로운 RDD를 리턴하고, 액션은 RDD를 제외한 데이터 타입을 리턴한다.
여기서 트랜스포메이션이 연산 결과 값으로 새로운 RDD를 리턴한다고 했는데, RDD는 수정이 불가능하다고 한 이유가 여기에 있다.
RDD가 수정되는 것이 아닌, 새로운 RDD를 하나 더 만들어서 변경된 값으로 채워 넣는 것이다.

Spark에서 자주 사용하는 트랜스포메이션과 액션은 다음과 같은 것들이 있다.

Transformations & Actions

이미지
(출처 : http://dirtysalt.github.io/spark-rdd-paper.html)
더 자세한 내용은 여기 참고



연산 처리 방식

Spark는 트랜스포메이션과 액션을 처리할 때 늘 여유로운 방식(lazy evaluation)으로 액션이 실행되는 시점에 처리한다.
예를 들어, sc.textFile(…) 을 실행하여 나온 RDD에 filter() 를 실행해 새로운 RDD를 만들고, 그 RDD에 first() 라는 Action을 수행하는 다음과 같은 코드를 생각해 보자.


lines = sc.textFile("README.md")
filteredLines = lines.filter(lambda line: "Python" in line)
print filteredLines.first()


일반적으로 생각해 보면 sc.textFile(…) 에서 이미 모든 파일을 읽어서 버퍼에 내용을 저장했을 것이다.
이후 filter() 를 수행하면서 그 많은 줄들을 전부 필터링했을 것이고 마지막으로 first()에서 가장 첫 번째 줄을 반환했을 것이다.
이는 매우 비효율적인 데이터 처리 방식이다. 결국 필요한 것은 “Python”이라는 문자열이 들어가는 첫 문장인데 저 방식을 그대로 사용하면 파일 전체를 읽어서 전체를 필터링하게 된다.


그래서 Spark는 액션 연산인 first()가 나오기 전까지의 트랜스포메이션은 meta에 저장해 두고 액션이 실행될 때 그에 맞추어서 트랜스포메이션을 실행한다. “Python”이라는 문자열이 나올 때까지만 파일을 읽고, 찾으면 바로 중단 후 반환하는 식으로 실행하는 것이다. 확실히 파일 전체를 읽는 것보다는 훨씬 효율적이다.


이렇게 연산을 실행하면 매우 효율적으로 보이지만 문제점이 하나 있다.
Spark는 액션 연산을 실행할 때마다 트랜스포메이션 연산을 다시 실행하므로 트랜스포메이션을 실행한 후 생성된 같은 RDD로 연산을 연속해서 수행할 경우 이미 트랜스포메이션을 통해 RDD를 한 번 생성했음에도 불구하고 같은 RDD를 계속해서 새로 생성하게 된다.
다시 예를 들어 다음과 같은 코드를 보자.


lines = sc.textFile("README.md")
filteredLines = lines.filter(lambda line: "Python" in line) # Transformation
print filteredLines.first()   # Action
print filteredLines.collect() # Action


비슷한 코드지만 마지막에 액션을 한 번 더 하고 있다.
이렇게 트랜스포메이션 이후 반복적으로 액션을 할 경우 여유로운 방식에 의해 lines.filter()는 3번째 줄에서 한 번, 4번째 줄에서 한 번 더 실행된다.
여기서 보기에는 별거 아닌 것처럼 보일지도 모르지만 데이터를 여러 번 수행하는 반복 알고리즘에 대해서는 매우 무거운 작업일 수 있다.

이런 반복 작업을 방지하기 위한 것이 persist() 라는 함수로, RDD를 메모리에 저장한 후 이후 해당 RDD를 사용할 때 저장된 RDD를 사용하도록 하는 것이다. 이를 영속화(Caching)라고 부른다. 메모리에 저장된 RDD는 LRU(Least Recently Used : 최근에 사용된 것들은 남겨 두고 오래된 것들을 버리는 알고리즘)에 의해 메모리가 가득 차거나 따로 해제하지 않는 이상 자동으로 해제되지 않으므로 프로그램 종료 전에 반드시 unpersist() 를 이용해 메모리에서 해제해야 한다.


lines = sc.textFile("README.md")
filteredLines = lines.filter(lambda line: "Python" in line)
filteredLines.persist(StorageLevel.DISK_ONLY)
print filteredLines.first()
print filteredLines.collect()
filteredLines.unpersist()


persist()는 5종류의 인자를 받을 수 있는데 이는 각각 다음과 같은 특징을 갖는다.



 레벨

 공간 사용

 CPU 사용 시간

 메모리에 저장

 디스크에 저장

 비고

 MEMORY_ONLY

 높음

 낮음

 예

 아니오

 

 MEMORY_ONLY_SER

 낮음

 높음

 예

 아니오

 

 MEMORY_AND_DISK

 높음

 중간

 일부

 일부

 

메모리에 넣기에 데이터가 너무 많으면 디스크에 나눠 저장

 MEMORY_AND_DISK_SER

 낮음

 높음

 일부

 일부

 

메모리에 넣기에 데이터가 너무 많으면 디스크에 나눠 저장. 메모리에 직렬화된 상태로 저장

 DISK_ONLY

 낮음

 높음

 아니오

 예

 



'Knowledge' 카테고리의 다른 글

PCB (Process Control Block)  (0) 2016.11.14
[Assembly] Intel x86, Local JMP [0xE9]  (0) 2016.11.07
Visual Studio 2008에서 libcurl 사용하기  (0) 2016.09.05
RC4 Stream Cipher  (0) 2016.07.19
스트림 암호 (Stream Cipher)  (0) 2016.07.18
블로그 이미지

__미니__

E-mail : skyclad0x7b7@gmail.com 나와 계약해서 슈퍼 하-카가 되어 주지 않을래?

,