Cloud computing for Scalable Network Management - Jérôme François

file transfer overhead → the mapper program is transferred ..... responding nodes... ▻ ...... for each IP address, if it s a suspect one, add packets to the corresponding attack ... -D mapred.reduce.tasks=4 data attacker.xml botnet dos spam.
16MB taille 1 téléchargements 38 vues
[email protected]

NOMS 04/20/2012

Cloud computing for Scalable Network Management http://lorre.uni.lu/~jerome/files/tutoNOMS2012.pdf

J´erˆome Fran¸cois This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 2.5 (http://creativecommons.org/licenses/by-nc-sa/2.5/, except screenshots from applications and when mentioned)

Introduction Hadoop In practice Complex jobs Applications Tools Installation A

Outline 1 2 3 4 5 6 7 8

Introduction Hadoop In practice Complex jobs Applications Tools Installation Alternatives 2 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A

Outline 1 2 3 4 5 6 7 8

Introduction Hadoop In practice Complex jobs Applications Tools Installation Alternatives 3 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Cloud computing ? The tutorial

Outline 1 Introduction Cloud computing ? The tutorial 2 Hadoop 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 8 Alternatives

4 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Cloud computing ? The tutorial

Definition of cloud computing I

Wikipedia definition: Cloud computing refers to the provision of computational resources on demand via a computer network, such as applications, databases, file services, email, etc

I

Twenty-One Experts Define Cloud Computing, Jan. 09, http://cloudcomputing.sys-con.com/node/612375 I I

scalability on demand (within minutes or seconds) cost aspects: I I

I I I I

avoid huge investments (infrastructure, training...) outsourced and pay-as-you-go service

web-based applications virtualization: SaaS, PaaS, IaaS easy deployment and configuration / ∼ grid but crash-proof allow people to access (massively) technology-enabled services → all Internet applications 5 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Cloud computing ? The tutorial

Definition of cloud computing I

Cloud computing and Grid Computing 360-Degree-Compared, I. Fost et. al GCE ’08 why ?

A large-scale distributed computing paradigm that is driven by economies of scale, in which a pool of abstracted, virtualized, dynamically-scalable, managed computing power, storage, platforms, and services are delivered on demand to external customers over the Internet.

within minutes 6 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Cloud computing ? The tutorial

Reinvent the wheel ? I

Differences with distributed computing / grid-computing I I

I

I

elasticity: resources are available “instantaneously” the cloud may be more open (public cloud) 6= Grid computing ∼ private or community network → easy-to use

Scalability shift from power to storage resources

7 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Cloud computing ? The tutorial

Outline 1 Introduction Cloud computing ? The tutorial 2 Hadoop 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 8 Alternatives

8 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Cloud computing ? The tutorial

Objective of the tutorial I I

Explain MapReduce paradigm Introduce Hadoop for running MapReduce job I I I

I

I I I

I

components data flows configuration

Network management data intensive problem resolution Practical examples Alternatives installation and management of cloud architectures... but some basics to build up your own cluster or use Amazon Web Service handling cloud related issues: security, reliability... 9 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Cloud computing ? The tutorial

Materials I

My inspiration: I

official websites: I I I I

I

http://hadoop.apache.org/ http://www.cloudera.com/ http://pig.apache.org/ http://hbase.apache.org/

books: I I I

Hadoop in Action, Chuck Lam, Manning Publications Pro Hadoop, Jason Venner, Apress Hadoop: The Definitive Guide, Tom White, O’Reilly Yahoo Press

10 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A

Outline 1 2 3 4 5 6 7 8

Introduction Hadoop In practice Complex jobs Applications Tools Installation Alternatives 11 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Outline 1 Introduction 2 Hadoop Overview MapReduce HDFS Components Communications Reliability 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 12 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

What it is?

I I I I I

Open-source framework Distributed applications for processing large data files Easy to use (and deploy) → no need to be an expert Robust: assumes that there are malfunctionning Popular (http://wiki.apache.org/hadoop/PoweredBy): I I I

non expert / small companies universities large companies: FaceBook, Google, Ebay, Twitter... 13 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Data-intensive processing I

Large data files handling = data-intensive processing I I

I

6= computational intensive work small process on many pieces of information contained in data files another way to distribute the work: I I I I

large data are usually already stored on clusters the code is transferred where the data is the code is executed where the data is no data transfer → avoid network overhead

14 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Data-intensive processing I

Large data files handling = data-intensive processing I I

I

6= computational intensive work small process on many pieces of information contained in data files another way to distribute the work: I I I I

large data are usually already stored on clusters the code is transferred where the data is the code is executed where the data is no data transfer → avoid network overhead

14 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Data-intensive processing I

Large data files handling = data-intensive processing I I

I

6= computational intensive work small process on many pieces of information contained in data files another way to distribute the work: I I I I

large data are usually already stored on clusters the code is transferred where the data is the code is executed where the data is no data transfer → avoid network overhead

14 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Data-intensive processing Network data volumes increase I

Cisco measured and forecasted Internet traffic (1000 PB/day) 60000 50000

PB/month

I

40000 30000 20000 10000 0

14

12

20

10

20

08

20

06

20

04

20

02

20

00

20

98

20

96

19

94

19

19

Year I

0.7 TB/day in Luxembourg 15 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Scale-Up vs Scale-Out I

Scale-up I I

I

Scale-out I I

I

add more resources on a single machines very expansive: high performance for I/O → SSD, SAS, SATA 3.0 (hard drives + controllers) add more machines with less resources less expansive

example: read 10TB Channel througput Scaling Time

High-end server 200MB/s x2 channels 7:00

Common machine 50 MB/s x20 machines 2:45 16 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

History I I

I

I

I

2004: Google File System and Map-Reduce Implementation of MapReduce for Nutch (Apache project): open-source search engine → Hadoop is created by Doug Cutting 2006: Doug Cutting hired by Yahoo! for working on Hadoop 2008: 10,000+core cluster in production for Yahoo! search engine 2009: Amazon Elastic MapReduce is released and uses Hadoop 17 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Outline 1 Introduction 2 Hadoop Overview MapReduce HDFS Components Communications Reliability 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 18 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Approach I

Data processing model with two key components: I I

I

the mappers execute the programs on data subsets the reducers aggregate the results from the mappers

toy example: wordcount, IPAddressCount I

count the number of flows per source IP address

IP Src 1.1.1.1 3.3.3.3 3.3.3.3 4.4.4.4 2.2.2.2 1.1.1.1

IP Dst 2.2.2.2 2.2.2.2 4.4.4.4 5.5.5.5 4.4.4.4 2.2.2.2

IP Src 1.1.1.1 2.2.2.2 3.3.3.3 4.4.4.4

Count 2 1 2 1 19 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Src IP address count I

One loop over the lines + a hash table for each li in lines: src_IP = get_src_IP(li) IP_count[src_IP]++ end for

I

//Tokenization

multiple files: 1 loop over files + 1 loop over the lines + a hash table for each f in files: for each li in lines(f): src_IP = get_src_IP(li) IP_count[src_IP]++ end for end for

//Tokenization

20 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Map-Reduce and Src IP address count I

Multiple input files/file subsets Mappers

IP Src File 1 1.1.1.1 3.3.3.3 File 2 3.3.3.3 4.4.4.4 2.2.2.2 File 3 1.1.1.1 I

IP Dst 2.2.2.2 2.2.2.2 4.4.4.4 5.5.5.5 4.4.4.4 2.2.2.2

1.1.1.1,1 3.3.3.3,2 4.4.4.4,1 2.2.2.2,1 1.1.1.1,1

Reducer

IP Src 1.1.1.1 2.2.2.2 3.3.3.3 4.4.4.4

Count 2 1 2 1

Issues: I

I I

file transfer overhead → the mapper program is transferred where the data is 1 reduce = 1 single point of failure each mapper has to memorize the hashtable before sending it 21 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Map-Reduce and Src IP address count I

Pseudo-code of Mappers: for each li in lines(f): src_IP = get_src_IP(li) //Tokenization IP_count[src_IP]++ //IP_count may be big in memory end for send(IP_count) //Send to the reducer

I

Pseudo-code of the reducer: for each received_IP_count: for each key in received_IP_count IP_count[key] = IP_count[key] + received_IP_count end for end for 22 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Mapper optimization I

Don’t wait the end of the process for sending the results I

I

for each parsed IP address, send the information with a count = 1 the reducer is already affected to the aggregation task Mappers

IP Src File 1 1.1.1.1 3.3.3.3 File 2 3.3.3.3 4.4.4.4 2.2.2.2 File 3 1.1.1.1

IP Dst 2.2.2.2 2.2.2.2 4.4.4.4 5.5.5.5 4.4.4.4 2.2.2.2

1.1.1.1,1 3.3.3.3,1 3.3.3.3,1 4.4.4.4,1 2.2.2.2,1 1.1.1.1,1

Reducer

IP Src 1.1.1.1 2.2.2.2 3.3.3.3 4.4.4.4

Count 2 1 2 1

23 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Map-Reduce and Src IP address count I

Pseudo-code of Mappers: for each li in lines(f): src_IP = get_src_IP(li) send([src_IP,1]) end for

I

//Tokenization

Pseudo-code of the reducer = the same: for each received_IP_count: for each key in received_IP_count IP_count[key] = IP_count[key] + received_IP_count end for end for 24 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Multiple reducers I

Process the reducing task in parallel I

partition the output of the Mappers 1. 1.1.1.1, 2.2.2.2 2. 3.3.3.3, 4.4.4.4

I

I

assuming an IP address, all counts have to be sent to the same reducer (shuffle) multiple outputs with disjoint results Mappers

IP Src File 1 1.1.1.1 3.3.3.3 File 2 3.3.3.3 4.4.4.4 2.2.2.2 File 3 1.1.1.1

IP Dst 2.2.2.2 2.2.2.2 4.4.4.4 5.5.5.5 4.4.4.4 2.2.2.2

Reducers IP Src Count 1.1.1.1 2 2.2.2.2 1

1.1.1.1,1 3.3.3.3,1 3.3.3.3,1 4.4.4.4,1 2.2.2.2,1 1.1.1.1,1

IP Src Count 3.3.3.3 2 4.4.4.4 1 Shuffle

25 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Multiple reducers I

Pseudo-code of Mappers (reducer of function is the result of partitioning): for each li in lines(f): src_IP = get_src_IP(li,reducer_of(li)) send([src_IP,1]) end for

I

//Tokenization

Pseudo-code of the reducer = the same: for each received_IP_count: for each key in received_IP_count IP_count[key] = IP_count[key] + received_IP_count end for end for

26 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Formal Map-Reduce I

Map-Reduce I I I

a set of mappers: Mappers a set of reducers: Reducers approach based on hkey , valuei pairs: I

I I I

map input: hk1, v 1i (k1: line number, filename... but rarely used for further usage) intermediate between mappers and reducers: hk2, v 2i reduce output: hk3, v 3i

partitioner: k2 → Reducers

list() Line numbers (,... )



list()





,... )

Shuffle

Mappers



Reducers list()



(, ...)

27 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Formal Map-Reduce I

∀k2, the values v 2 are aggregated to a single list I I

network overhead reduction hk2, v 2i are sorted by keys (then easy to aggregate): I I

I

partial sort on the mapper machine final sort on the reducer machine

∼ hash table on mapper (one prior issue) I I

I

managed by the (Hadoop) framework optimizations already build-in (buffers, mix between hard drive and in-memory storage) final user/developer has not to take care about it

Mappers list() list()

(, ) ()

Sort and Shuffle

k2,v20 k2,v21 ... k2',v2'0 k2',v2'1

} }

(, (, )

Aggregate

Reducers list()



(, ...)

28 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

The Black-Box I

I

Hadoop = MapReduce framework with many abstractions Easy to use for novices and enough advanced for experts I

I

programming is limited to the core of the mappers and reducers = specific process regarding the application the “Black-Box” may be fine-tuned by configurations and programming Mappers k2,v2

Multiple files Line numbers

To program

Sort Shuffle Aggregate

Reducers

k3,v3

{

k1,v1

{

list(k1,v1)

To program

list(k3,v3) Multiple files

29 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Outline 1 Introduction 2 Hadoop Overview MapReduce HDFS Components Communications Reliability 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 30 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

The Need of a Specific File System I I

HDFS: Hadoop Distributed File System Why a specific file system I I

I

paradigm shift: send the code to the data → data has to be previously distributed

A file in HDFS is broken in multiple blocks I I

the mapper processes the blocks they have blocks are maintained to keep redundancy (3 copies) Mappers

1 file

{

Block 1 Block 2 Block 3

IP Src File 1 1.1.1.1 3.3.3.3 File 2 3.3.3.3 4.4.4.4 2.2.2.2 File 3 1.1.1.1

IP Dst 2.2.2.2 Native FS 2.2.2.2 Native FS 4.4.4.4 5.5.5.5 4.4.4.4 Native F S 2.2.2.2

31 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Files, Blocks and Splits I

Blocks and splits I I I

I

a block is a physical division (64 MB) a split is a logical division they are not necessarily aligned → a line of a file may be split over two blocks

Using multiple files is still possible I I

each file will be split by blocks Hadoop is more efficient with bigger files I I

manage easily 100 TB or more in a single file abstract the handling of large files to the user

32 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Outline 1 Introduction 2 Hadoop Overview MapReduce HDFS Components Communications Reliability 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 33 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

HDFS I

NameNode = metadata of the HDFS I I

I

manage replication no transfer through this node → avoid overhead

Datanodes on slave machines: I I

file blocks I/O continuously reporting to Namenode

(3) Block I/O request

Master node 1 (1) Send Data Get Results

User

(2) Blocks/Datanode to request

Slave node

datanode

namenode

Slave node

datanode

local file system

block division maintaining

(2') Replication request

...

Slave node

datanode

local file system

HDFS

local file system

(3') Data transfert (4') Report

(1') Report

// 34 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Map-Reduce I

Jobtracker: I I

I

assigns the tasks monitors the tasks and reassignment if failure

Tasktracker: I I I

executes locally the tasks reports to the jobtracker outputs the map results to the reducers Slave node

Master node 2

User

Submit job

jobtracker Task assignements and monitoring

Assign tasks

Slave node

...

Slave node

tasktracker

tasktracker

tasktracker

Map Reduce

Map Reduce

Map Reduce

Application

Map output sent to hash(key)

Hearthbeat

35 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Global architecture I

Namenode: a single point of failure secondary Namenode for taking metadata snapshot manual intervention to recover

I I

Unique master Backup node

Sn

Secondary namenode HDFS metadata snapshot

ta Da lts nd su Se t Re Ge User

Su

bm

it

Master node 1 ap

sh

Slave node

Slave node

...

Slave node

ot

namenode block division maintaining

datanode local file system

datanode local file system

datanode

HDFS

local file system

Master node 2

Local I/O

Local I/O

Local I/O

jobtracker

tasktracker

tasktracker

tasktracker

Map Reduce

Map Reduce

Map Reduce

job

Task assignements and monitoring

Application

36 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Outline 1 Introduction 2 Hadoop Overview MapReduce HDFS Components Communications Reliability 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 37 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Write data I

Node to node communications I I

I I

replication → creation of a pipeline the client splits the file and requests Namenode for each individual blocks of 64MB (steps 1 and 3 are multiple) Namenode provides a list of datanodes to the client acknowledgment mechanism to wait that all datanodes are ready (3) Write request ? + DN 3,4

Master node 1

User computer Local filesystem

Myfile.txt Block1 Block2 Block3

(1) Write request: Myfile.txt: blocks 1,2,3 (2) Write reply Block1: DN 1,3 4 Block2: DN 2,3,4 Block3: DN 1,2,4

namenode block division maintaining

(8) Write Ack (OK)

(4) Write request ? + DN 4

Slave node

...

(5) Write request ?

Slave node

...

Slave node

datanode 1

datanode 3

datanode 4

local file system

local file system

local file system

(7) Write Ack (OK)

(6) Write Ack (OK)

38 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Write data I

Writing data I I

I

data is transmitted through the pipeline the “reverse” pipeline is used to propagate acknowledgement of written data each datanode the client reports to the Namenode Local filesystem

datanode 1

Myfile.txt

datanode 3

datanode 4

Data pipeline

Block1 Block2 Block3

namenode

Local filesystem

datanode 1/2/1

datanode 3/3/2

datanode 4/4/4

Block1 Ack Block1 Block2 Ack Block2 Block3 Ack Block3

39 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Write data I

Writing data Local filesystem

namenode Block1

datanode 1

datanode 3

datanode 4

Write request ? (pipeline init) Write Ack

Data packet 1 Ack data packet1 Ack data packet2 Ack data packet3

Data packet 2 Data packet 1

... Write end (pipelin e close) Write end ack

write success

40 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Read data I

Read data I

I

locate data: Namenode provides a list of locations for each data block get one replica of each block (first location in the list) by requesting datanodes directly (7) Read MyFile.txt block3 (5) Read MyFile.txt block2

Slave node

(3) Read MyFile.txt block1

datanode 3 Master node 1

User computer Local filesystem

Slave node

Slave node

(1) read request: Myfile.txt:

local file system Block1 Block2

(2) Read reply Block1: DN 1,3 4 Block2: DN 2,3,4 Block3: DN 1,2,4

datanode 1

namenode block division maintaining

local file system Block1 Block3

(4) Data MyFile.txt block1 (6) Data MyFile.txt block2 (8) Data MyFile.txt block3

datanode 2 local file system Block2 Block3

Slave node

datanode 4 local file system Block1 Block2 Block3

41 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

MapReduce I

Map I I I

user submit a job jobtracker requests tasktrackers to map on local data tasktracker reports to the jobtracker Master node 1

(3) Block1: DN 1,3 4 Block2: DN 2,3,4 Block3: DN 1,2,4

Slave node

namenode (2) Myfile.txt: Block location ?

datanode 1

datanode 2

local file system

local file system

block division maintaining Block1

Local filesystem

(1) Submit job: Work on Myfile.txt:

(5') Progress report

...

Slave node

datanode 4 local file system Block1

Block2 Block3

Block3

User computer

Slave node

Block2 Block3

Master node 2

Local I/O

Local I/O

Local I/O

jobtracker

tasktracker

tasktracker

tasktracker

Map Reduce

Map Reduce

Map Reduce

Task assignements and monitoring (4) Map on block1

(4) Map on block2 (4) Map on block3 (5) Progress report

42 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

MapReduce Reduce Master node 1

namenode

Slave node

datanode 1

datanode 2

local file system

local file system

block division maintaining Block1

Local filesystem

(1) Submit job: Work on Myfile.txt:

(7') Progress report

...

Slave node

datanode 4 local file system Block1

Block2 Block3

Block3

User computer

Slave node

Block2 Block3

Master node 2

Local I/O

Local I/O

Local I/O

jobtracker

tasktracker

tasktracker

tasktracker

Map Reduce

Map Reduce

Map Reduce

Task assignements and monitoring

(8) HDFS write

I

(6) Output to reduce

(7) Progress report

43 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Outline 1 Introduction 2 Hadoop Overview MapReduce HDFS Components Communications Reliability 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 44 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Issues I

Availibility of data I I

I

I

replication... ...ok but how to make it efficient ?

Efficient task execution → how to select the task trackers How to hande failures: I

I

Disk IO, communications, failures in process, non responding nodes... TCP based Hadoop communications is not sufficient

45 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Block allocation Avoid data loss → optimized replica locations I

I

I

→ rack awareness: I

I

I

I

replication consumes disk ressources but also network ressources → place replica in close locations replication should also protect against a global rack failures

1st and 3rd replicas in the same rack 2nd replica in a different rack other replicas in random racks

manual configuration (dfs.network.script)

Switch

Rack 1 Switch High performances

I

datanode 1 Block1 Block2

datanode 2

Rack 2 Switch datanode 4 Block1

datanode 5

Block2

datanode 3 Block1

Rack 3 Switch datanode 7 datanode 8 Block2

datanode 6 Rack failure protection

46 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Heartbeat I

I

block division maintaining

2) (DN ck2 ck3 Blo Blo 1, op y C k c (3) Blo ) (1 Slave node

datanode 1

Slave node

datanode 2

local file system

local file system Block2 Block3

Block1 Block3 Block1 (4) Stream

Block2

(2) Metadata Block1: DN 1,3 4 Block2: DN 2,3,4 Block3: DN 1,2,4

(1)

Blo

ck

)

I

HDFS read to closest datanodes distance between nodes = sum of distances to their common ancestors

namenode

(3

I

Master node 1

1, ck lo k2 ) B loc B

→ requests datanodes to copy underreplicated blocks (piggybacking) locality awareness

(1

I

TCP heartbeat of datanodes every 3 seconds Block report every 10 heartbeat → namenode has a global view about data replication

(3 ) (1 )B Bl loc oc k k3 2,

I

(4) Stream

Slave node

datanode 3

1,

Blo

ck2

,b lo

ck3

Slave node

datanode 4

local file system Block1

local file system Block1

Block2

Block2 Block3

Block3

47 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

HDFS pipeline I

Acknowledgment mechanism I

I I

I

I

two counters per node: number of bytes received + number of bytes acknowledged by the downstream datanode ACK sent only when ACK from downstream is received → consistency = data is visible (ack to namenode) when all namenodes have it failure → reconstruct pipeline with remaining nodes + generation timestamp

Pipeline creation failure I I I I

no ack / error ack = failure → propagate information in the “reverse” pipeline ready datanodes close the block file and TCP connections client requests namenode for another set of datanodes (append: replace only failed datanodes) 48 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Task assignation I I I

I

I

Heartbeat → HDFS meta data up-to-date + available nodes Fixed numbers of slots for map and reduce per node Select mappers → locality awareness I select a local mapper for each block if avalaible I a closer one else (rack awareness in priority) Select reducers (no locality) I ∼ random selection I next available reduce slots task failure = exception thrown by the JVM or timeout I reassign the task by avoiding to select a node that has failed previously I too many failures (mapred.[map|reduce].max.attempts) → stop the tasl I useful partial results → allow a level of failure (mapred.max.[map|reduce].failures.percent) 49 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Job scheduling I

Default scheduler I I

I

FIFO + priority can be assigned to job but no preemption not suited for multi user deployment

Fair scheduler (developed by Facebook)

jobs into pools (∼ users) with conditions on slot allocation performance → take all available slots I fair → preemption (task kill) to readjust slot usage regarding the configuration I

I

5 5 2.0 6 3

50 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Job scheduling

I

Capacity scheduler (developed by Yahoo) I I I I

jobs are allocated to queues each queue has a configurable number of slots remaining slots are equally divided into queues each queue is managed with FIFO + priorities

51 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Secondary namenode I

namenode I

I

volatile memory : snapshot of the current HDFS meta-data → fast query replies + 2 files I I I

I

I

FsImage: a past snapshot EditLog: incremental changes FsImage + EditLog → generation of an up-to-date snaphsot (archive, initialization)

Single point of failure

Secondary namenode I

1st objective: namenode recovery, performance improvement I I I

huge EditLog on busy clusters primary namenode busy with real time queries secondary namenode delegated to create snapshots 52 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Secondary namenode I

recovery = replace namenode by secondary namenode = manual intervention I I I

FsImage + EditLog in dfs.name.dir directory property dfs.name.dir not used by the secondary namenode → copy namenode files to secondary namenode (dfs.name.dir can specify multiple directories for this purpose) via a mounting point

dfs.name.dir localdir,remotedir I

last step: change the hostname / IP address (or reroute traffic) of the secondary namenode to play the role of the primary namenode (datanodes cache the IP address!) 53 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability

Other mechanisms I

Concurrent read and write issues: I

I

I

I

write → obtain and renew a lease for preventing concurrent writing heartbeat mechanism: preemption of the lease after inactivity, file closure after a long period (1 hour) of inactivity concurrent reads is allowed

Corrupted file blocks I I I

1 data block → 1 checksum read = (block + checksum) → verification by the client if error → take another replica and inform the datanode to get a valid replica 54 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A

Outline 1 2 3 4 5 6 7 8

Introduction Hadoop In practice Complex jobs Applications Tools Installation Alternatives 55 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Outline 1 Introduction 2 Hadoop 3 In practice Environment HDFS MapReduce Programming Customization Streaming Security 4 Complex jobs 5 Applications 6 Tools 7 Installation 56 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Installation I

Different installation mode: I

I

I

I

fully distributed: master machine + backup machine + slave machines (for production) standalone: single machine, no Hadoop Daemon (for development/debugging purposes) pseudo distributed: a single machine with the different daemon (for development/debugging purposes, memory usage, interaction)

Cloudera I I I

http://www.cloudera.com/ Hadoop-based services Cloudera’s Distribution including Apache Hadoop (CDH): Hadoop + plug-ins + packaging → easy-to-[install|use|deploy] 57 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Outline 1 Introduction 2 Hadoop 3 In practice Environment HDFS MapReduce Programming Customization Streaming Security 4 Complex jobs 5 Applications 6 Tools 7 Installation 58 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Program Workflow Write data

Analysis might be intensive

I

I I

I

Analysis

...

Read results Read results

Write data ∼ upload data I

I

Read results

Analysis

can be network intensive multiple analysis for the same data incremental updates

read / write = I/O on HDFS are essential HDFS may not be accessed using standard UNIX commands → hadoop fs -cmd 59 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Example I

core-site.xml: fs.default.name hdfs://localhost:8020

I

ls Example I I

hadoop fs -ls hadoop fs -ls hdfs://localhost:8020/

60 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Example I

core-site.xml: fs.default.name hdfs://localhost:8020

I

ls Example I I

hadoop fs -ls hadoop fs -ls hdfs://localhost:8020/ No replication for directory (= structure) !

Replication level = 1

bydefault: user directory

60 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Prepare data I

hadoop fs, hadoop fs -help ls → Goal display sample.txt get a local copy of sample.txt remove sample.txt create a directory for data copy flows100000.csv into the data directory

I

Cmd

sample file extracted from the dataset provided in Sperotto, Anna and Sadre, Ramin and Vliet van, Frank and Pras, Aiko (2009) A Labeled Data Set For Flow-based Intrusion Detection. In: IP Operations and Management. Lecture Notes in Computer Science 5843, pp. 39-50. ISBN 9783642049675

I

each line is a record: id, src ip, dst ip, #packets, #bytes, start time, start msec, end time, end msec, src port, dst port, tcp flags, proto

61 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Prepare data I

hadoop fs, hadoop fs -help ls → Goal display sample.txt get a local copy of sample.txt remove sample.txt create a directory for data copy flows100000.csv into the data directory

I

Cmd hadoop fs -cat sample.txt -get sample.txt localSample.txt -rm sample.txt -mkdir data -put flows1000000.csv data

sample file extracted from the dataset provided in Sperotto, Anna and Sadre, Ramin and Vliet van, Frank and Pras, Aiko (2009) A Labeled Data Set For Flow-based Intrusion Detection. In: IP Operations and Management. Lecture Notes in Computer Science 5843, pp. 39-50. ISBN 9783642049675

I

each line is a record: id, src ip, dst ip, #packets, #bytes, start time, start msec, end time, end msec, src port, dst port, tcp flags, proto

61 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Monitoring I

Web GUI: localhost:50070

I

status/health of HDFS free space HDFS browsing

I I

62 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Monitoring I

Web GUI: localhost:50070

I

status/health of HDFS free space HDFS browsing

I I

data size on DFS free physical space failures safe removing

62 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Outline 1 Introduction 2 Hadoop 3 In practice Environment HDFS MapReduce Programming Customization Streaming Security 4 Complex jobs 5 Applications 6 Tools 7 Installation 63 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Types I

Key and values have specific types (serialization requirements): I I I I

I

package org.apache.hadoop.io value types implement Writable key type WritableComparable default type (WritableComparable) are wrappers for standard types: [Boolean|Byte|Double|Float|Integer|Long]Writable, Text, NullWritable defining custom types is possible I I

Writable → readFields, write WritableComparable → + compareTo

64 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Types I

The MapReduce program has to read and Write on the HDFS → specific format I

input → InputFormat

Class TextInputFormat

Record each line

NLineInputFormat

similar to NLineInline offset (Text) putFormat where each split is exaclty N lines defined by mapred.line. input.format.linespermap (LongWritable) each line before the sepafter the separator arator defined by (Text) key.value.separator.in. input.line (default \t) (Text) each line 10 first chars (Text) the rest of the line (Text) 65 / 242

KeyValueTextInputFormat

TeraInputFormat

Key line offset

Value line content (LongWritable) (Text) line content

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Types I

The MapReduce program has to read and Write on the HDFS → specific format I

output → OutputFormat

Class TextOutputFormat SequenceFileInputormat TeraOutputFormat NullOutputFormat

Description 1 record (Key tab value) per line (separator is defined by textoutputformat.separator ) Specific format for passing binary data through multiple MapReduce program 1 record (KeyValue) per line no output

66 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Code Skeleton Mappers k2,v2

Multiple files Line numbers

To program

Sort Shuffle Aggregate

Reducers

k3,v3

list(k3,v3)

{

k1,v1

{

list(k1,v1)

To program

Multiple files

public class MyMapReduce { public static class Map extends MapReduceBase implements Mapper { public void map(K1 key, V1 value, OutputCollector output, Reporter reporter) throws IOException { } }

Genericit Interfaces

public static class Reduce extends MapReduceBase implements Reducer { public void reduce(K2 key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { } }

67 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Code Skeleton I

OutputCollector is a wrapper for managing output I

I

I

output.collect(key,value)

Reporter is used to send status message along the process (advanced programming) recent stable release update (this code may not work in the future) I I

0.20 still compatible no compatibility in the future I I

I

OutputCollector → Context package structure changes

a main is required! 68 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Code skeleton public class MyMapReduce { public static class Map extends MapReduceBase implements Mapper{ } public static class Reduce extends MapReduceBase implements Reducer { } Has to be coherent public static void main(String[] args) throws Exception { JobConf conf = new JobConf(MyMapReduce.class); conf.setJobName("MyMapReduceName"); conf.setInputFormat(InputFormat.class); conf.setOutputFormat(OutputFormat.class); conf.setOutputKeyClass(K3.class); conf.setOutputValueClass(V3.class);

Define input and output format

Define output key and value type

conf.setMapperClass(Map.class); Define mappers and reducers conf.setReducerClass(Reduce.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); Define the input data and where FileOutputFormat.setOutputPath(conf, new Path(args[1])); to store the output JobClient.runJob(conf); RUN ! }

69 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Code skeleton I

Output types of the mapper can also be defined I I I

I

by default: same as reducer key: job.setMapOutputKeyClass() value: job.setMapOutputValueClass()

Don’t forget to include packages: I I

old/current API: mapred package new API: mapreduce package New API

Old API import java.io.IOException; import java.util.*; import import import import import

org.apache.hadoop.fs.Path; org.apache.hadoop.conf.*; org.apache.hadoop.io.*; org.apache.hadoop.mapred.*; org.apache.hadoop.util.*;

import java.io.IOException; import java.util.*; import java.util.regex.Pattern; import import import import import import import

org.apache.hadoop.fs.Path; org.apache.hadoop.conf.*; org.apache.hadoop.io.*; org.apache.hadoop.mapreduce.*; org.apache.hadoop.mapreduce.lib.input.*; org.apache.hadoop.mapreduce.lib.output.*; org.apache.hadoop.util.*;

70 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

IPCount public static class Map extends MapReduceBase implements Mapper { private final static IntWritable one = new IntWritable(1); private LongWritable ip = new LongWritable(); public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); Pattern regExpPattern = Pattern.compile(","); String items[] = regExpPattern.split(line); ip = new LongWritable(Long.parseLong(items[1])); output.collect(ip, one); } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(LongWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }

71 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

IPCount public static void main(String[] args) throws Exception { JobConf conf = new JobConf(IPCount.class); conf.setJobName("MyMapReduce");

conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); }

72 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Compile and Run 1. compile I I

eclipse: automatic build or click terminal: javac -classpath /usr/lib/hadoop-0.20/ hadoop-0.20.2-cdh3u0-core.jar path/IPCount.java

2. create a jar I I

eclipse: click, click and click terminal: jar -cvf IPCount.jar -C DirectoryWithClasses DirectoryInTheJar

3. execute: hadoop jar IPCount.jar YourClassWithMain data out 4. look at the results in out 73 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

JobTracker I I

Web GUI: localhost:50030 status of the job, tasks (progress, failures)

74 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Code skeleton with the New API public class MyMapReduceNewAPI {

genericity: context gets the type for the class

public static class Map extends Mapper { public void map(K1, V1, Context context) throws IOException, InterruptedException {} } genericity: context gets the type for the class public static class Reduce extends Reducer { public void reduce(K2, Iterator values, Context context) throws IOException, InterruptedException {} } public static void main(String[] args) throws Exception { JobConf divided between Job and Configuration Configuration conf = new Configuration(); Job job = new Job(conf,"Couting IP addr with the new API"); job.setJarByClass(MyMapReduceNewAPI.class); job.setInputFormatClass(InputFormat.class); job.setOutputFormatClass(OutputFormat.class); job.setOutputKeyClass(K3.class); job.setOutputValueClass(V3.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //job.submit(); job.waitForCompletion(true); } }

75 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Configuration I

configuration files are defined by XML: PropName PropValue PropDesc

I

JobConf / Configuration class : I

I

properties of config files can be set/overridden or read: addRessource(XMLfile), set(name,value), get(name) runtime properties can be set: I

I

setNumMapTasks(n): only a hint, it is derived from the split division (logical division of blocks, not exactly aligned) → mapred.min.split.size property in mapred-default.xml setNumReduceTasks(n), setNumMapTasks(n) 76 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Fair scheduler Installation I

contrib module I

I

I

HADOOP HOME/contrib/fairscheduler (/usr/lib/hadoop/contrib/fairscheduler/hadoopfairscheduler-0.20.2-cdh3u0.jar) add the jar file to default lib (HADOOP HOME/contrib) to load or modify the classpath (HADOOP CLASSPATH)

mapred-site.xml mapred.jobtracker.taskScheduler org.apache.hadoop.mapred.FairScheduler

I

default pool name = user mapred.fairscheduler.poolnameproperty user.name

77 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Fair scheduler Installation I I

Web interface: http://localhost:50030/scheduler

78 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Outline 1 Introduction 2 Hadoop 3 In practice Environment HDFS MapReduce Programming Customization Streaming Security 4 Complex jobs 5 Applications 6 Tools 7 Installation 79 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Combiner I

IPCount: collects a lot of 1 → network overload Do a local aggregation I

I

I

Mappers + combiners k2,sublist(v2) k2,v2 k1,v1

{

list(k1,v1)

combiner: local reducer on the same machine doing a map → implements Reducer the results has to be the same with or without instantiating the combiner 6= example with hashtable, the combiner does not wait all results (buffered)

Multiple files Line numbers

I

To program

Sort Shuffle Aggregate

Reducers

k3,v3

{

I

To program

list(k3,v3) Multiple files

a combiner with IPCount →conf.setCombinerClass(Reduce.class) 80 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Easy configuration I

GenericOptionsParser I I

I

I

command line argument interpretation configuration: file, namenode (can be remote), jobtracker (can be remote), common files to deploy... custom properties

the Tool interface I I I I

I

handle mix between custom and standard arguments used with Configured to implement basic getter and setter ToolRunner uses GenericOptionParser and Tool the javadoc provides the standard way to program a MapReduce application (new API) implementation of the run method (previously in main) 81 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Easy configuration I

(old) API public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, MyMapReduce.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormat(InputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(K3.class); job.setOutputValueClass(V3.class); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new MyMapReduce(), args); System.exit(res); }

82 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Easy configuration I

IPCount with ToolRunner I

I

I I

int res = ToolRunner.run(new Configuration(), new IPCountConfig(), args); change properties with the command line: increase the number of reduce tasks results use the first field of the csv file (an index) as the key for the mapper

83 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Easy configuration I

IPCount with ToolRunner I

I

int res = ToolRunner.run(new Configuration(), new IPCountConfig(), args); change properties with the command line: increase the number of reduce tasks I

I I

hadoop jar IPCount.jar IPCountConfig -D mapred.reduce.tasks=10 data out

results use the first field of the csv file (an index) as the key for the mapper

83 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Easy configuration I

IPCount with ToolRunner I

I

int res = ToolRunner.run(new Configuration(), new IPCountConfig(), args); change properties with the command line: increase the number of reduce tasks I

I I

hadoop jar IPCount.jar IPCountConfig -D mapred.reduce.tasks=10 data out

results → one file per reducer use the first field of the csv file (an index) as the key for the mapper

83 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Easy configuration I

IPCount with ToolRunner I

I

int res = ToolRunner.run(new Configuration(), new IPCountConfig(), args); change properties with the command line: increase the number of reduce tasks I

I I

hadoop jar IPCount.jar IPCountConfig -D mapred.reduce.tasks=10 data out

results → one file per reducer use the first field of the csv file (an index) as the key for the mapper I I

I

input as KeyValueTextInputFormat → do the split... ...but use specific types → change the type for the mapper definition (Mapper < Text, Text, LongWritable, IntWritable >) hadoop jar IPCount.jar IPCountKeyInput -D mapred.reduce.tasks=1 -D key.value.separator.in.input.line=, data out

83 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

HDFS access I

I

ToolRunner → recycle easily the code based on current context Network management/monitoring → repeat tasks and keep all or a recent analytics I I I

by default: no override on files →Avoid user manipulation for preparing the HDFS (rmr) api for HDFS: FilesSystem class

f i n a l FileSystem fs = FileSystem . get ( conf ) ; i f ( fs . exists ( new Path ( args [ 1 ] ) ) ) { fs . delete ( new Path ( args [ 1 ] ) , t r u e ) ; }

84 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Predefined Mappers and Reducers I

Hadoop provides standards Mappers and Reducers I I

I

may have predefined input/output types (use it carefully) examples: TokenCountMapper (split text into keys with a value of one), LongSumReducer (sum all values for one key), identityReducer (directly forward the output of the Mapper)

New API is currently less complete

Old New 85 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Predefined Mappers and Reducers I

How to: I

IPcount with predefined mappers/reducers

86 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Predefined Mappers and Reducers I

How to: I

IPcount with predefined mappers/reducers I

I

I

I

I I

I

I

FieldSelectionMapReduce mapper to split the line using a separator define the separator: job.set("mapred.data.field.separator",",") define the pairs job.set("map.output.key.value.fields.spec","1:1") no useful reducer: LongSumReducer cannot handle Text (defined types outputted by FieldSelectionMapReduce) → define a specific reducer types errors between mapper and reducers ← output of the mapper and reducers are different (default) define types for of the mapper: job.setMapOutputKeyClass(Text.class), job.setMapOutputValueClass(Text.class) difference of the output with different version: different sorting if key output type is Text 86 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Predefined Mappers and Reducers I

Issues I I

Count the number of packets send by each IP address predefined mappers/reducers + combiner

87 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Predefined Mappers and Reducers I

Issues I I

Count the number of packets send by each IP address predefined mappers/reducers + combiner I I I



mapper and reducers similar to previously error type using the reducer as the combiner → define a specific combiner

Map FieldSelectionMapReduce



Reduce

e

on

Cl



Combiner



87 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Custom Parameters of Map Reduce I

Parameters can be retrieved by mappers and reducers I I

I

get the JobConf object public void configure(JobConf job) is called at the initialization and may be customized

Extended IPCount I

apply sampling + compare → #pkts sum for every x flows of each IP address ×x

88 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Custom Parameters of Map Reduce I

Parameters can be retrieved by mappers and reducers I I

I

get the JobConf object public void configure(JobConf job) is called at the initialization and may be customized

Extended IPCount I

apply sampling + compare → #pkts sum for every x flows of each IP address ×x I I I

get the 3rd command line argument and save it in Jobconf get it within the configure method reducer compute the global sum as the sampled one using the modulo operator (%)

88 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Custom Parameters of Map Reduce I

Parameters can be retrieved by mappers and reducers I I

I

get the JobConf object public void configure(JobConf job) is called at the initialization and may be customized

Extended IPCount I

apply sampling + compare → #pkts sum for every x flows of each IP address ×x I I I

get the 3rd command line argument and save it in Jobconf get it within the configure method reducer compute the global sum as the sampled one using the modulo operator (%)

p u b l i c s t a t i c c l a s s Reduce { p u b l i c v o i d configure ( JobConf job ) { sampling = Integer . parseInt ( job . get ( ” s a m p l i n g ” ) ) ; } } p u b l i c i n t run ( String [ ] args ) t h r o w s Exception { job . set ( ” s a m p l i n g ” , args [ 2 ] ) ; }

88 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Outline 1 Introduction 2 Hadoop 3 In practice Environment HDFS MapReduce Programming Customization Streaming Security 4 Complex jobs 5 Applications 6 Tools 7 Installation 89 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

If you don’t like Java... I

I

Hadoop is developed in Java but MapReduce programming may use other languages Streaming I

I

I

use standard I/O: STDIN, STDOUT as input and output of the mapper and reducer every language can be used but has to be installed on all the cluster prior a dedicated jar hadoop-streaming-0.20.2-cdh3u0.jar:

hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-*.jar -help Usage: hadoop hadoop-streaming.jar [options] Options: -input DFS input file(s) for the Map step -output DFS output directory for the Reduce step -mapper The streaming command to run -combiner The streaming command to run -reducer The streaming command to run

90 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Inputs and Outputs

I I

Text records as: key tab value (value may be empty) Generic options are supported → custom separator Mapper: I I

I

input: each line of the input data are outputted to STDIN output: each line of STDOUT

Reducer: I

I

input: output of mappers sorted per keys are outputted per line to STDIN output: each line of STDOUT is written in the output file Mappers k1,v1

k2,v2

{

list(k1,v1) Multiple files Line numbers

To program

Sort Shuffle Aggregate

Reducers k2,v2; k2,v2;...

k3,v3

{

I

To program

list(k3,v3) Multiple files

91 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

IPCount I

IPCount using streaming capabilities + your preferred language

92 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

IPCount I

IPCount using streaming capabilities + your preferred language I I I I

the script is not present on the remote computer → error deploy the script on the cluster: option -file the mapper has to do the key/value split on its own different separators can be used: the output of the mapper is translated for the reducer

hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u0.jar -D stream.map.output.field.separator=, -input data -output out7 -mapper ’IPCountMR.py map’ -reducer ’IPCountMR.py reduce’ -file ’IPCountMR.py’

92 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Python code def map(): for line in sys.stdin: tab = line.split(",") print tab[1],",",1 def reduce(): last_seen = None total = 0 for line in sys.stdin: tab = line.split("\t") if last_seen == None or tab[0] != last_seen: if last_seen !=None: print last_seen,":",total last_seen = tab[0] total = 0 total+= 1 print tab[0],":",total if __name__ == "__main__": if sys.argv[1] == "map": map() else: reduce()

93 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

IPCount I

IPCount using streaming capabilities without scripts

94 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

IPCount I

IPCount using streaming capabilities without scripts I

use the Unix commands I I

map output values to reducer may be empty cut + uniq

hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u0.jar -input data -output out8 -mapper ’cut -d , -f 2’ -reducer ’uniq -c

94 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

IPCount I

IPCount using streaming capabilities without scripts I

use the Unix commands I I

map output values to reducer may be empty cut + uniq

hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u0.jar -input data -output out8 -mapper ’cut -d , -f 2’ -reducer ’uniq -c I

the aggregate package I

the mapper has to specify the function to apply: function:key tab value

94 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

IPCount I

IPCount using streaming capabilities without scripts I

use the Unix commands I I

map output values to reducer may be empty cut + uniq

hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u0.jar -input data -output out8 -mapper ’cut -d , -f 2’ -reducer ’uniq -c I

the aggregate package I

the mapper has to specify the function to apply: function:key tab value

hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u0.jar -input data -output out9 -mapper ’IPCountMapperAggregate.py’ -reducer aggregate -file ’IPCountMapperAggregate.py’

94 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Outline 1 Introduction 2 Hadoop 3 In practice Environment HDFS MapReduce Programming Customization Streaming Security 4 Complex jobs 5 Applications 6 Tools 7 Installation 95 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Rights management I

HDFS files: user / group owner read / write / execute permissions I hadoop dfs command / java api [-chmod [-R] PATH...] [-chown [-R] [OWNER][:[GROUP]] PATH...] [-chgrp [-R] GROUP PATH...] I I

I

quotas: hadoop dfsadmin I

I

limit number of files per directory → avoid too many files (hadoop prefers big files!) [-setQuota ...] [-clrQuota ...] limite space usage [-setSpaceQuota ...] [-clrSpaceQuota ...] 96 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

User managament I

Cluster usage per user: I I

I I

permissions + quotas = limit hdfs usage job scheduling = limit computational resources usage

→ security is enforced → balance cluster usage within trusted and non hostile users (within a team for example) I

user authentication: I

I I I

OS user = Hadoop user based on whoami and bash -c groups superuser = user who started Hadoop daemons impersonate users is very simple (change whoami command)

node to node communication: hadoop customized java RPC I I

no authentication no encryption 97 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Attacker playground I I

A lot of space for attackers + “local” attacks if users can access directly to machines (shared machines) Unique master

Backup node

Sn

Secondary namenode HDFS metadata snapshot

ta Da lts nd su Se t Re Ge User

Su

bm

it

Master node 1 ap

sh

Slave node

Slave node

...

Slave node

ot

namenode block division maintaining

datanode local file system

datanode local file system

datanode

HDFS

local file system

Master node 2

Local I/O

Local I/O

Local I/O

jobtracker

tasktracker

tasktracker

tasktracker

Map Reduce

Map Reduce

Map Reduce

job

Task assignements and monitoring

Application

98 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Security in Hadoop I

Cloud computing and security = a big challenge I I

I

I

I I I

CIA properties outsourced environment → threats from the service providers and other clients many reports / research on that = a large domain → not the objective of the this presentation... but how to make Hadoop a bit more secure

isolate the clusters to a group a trusted users isolate Hadoop machines (put in dedicated VMs) + security appears recently in Hadoop (end of 2011) 99 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security

Security in Hadoop I

Kerberos I I

I

simple Authentication and Security Layer (SASL) generic Security Services Application Program Interface (GSSAPI) mutual authentication: user/namenode + delegation token for limiting the load to the KDC (Key Distribution Center) Unique master

Master node 1

Slave node

(1) Authentication

(2) Delegation Token namenode block division maintaining Kerberos User

...

Slave node

(6) Verification

Master node 2

jobtracker Task assignements and monitoring

datanode

k = Shared secret datanode

HDFS

local file local file system system (5 ) (4) r Blo ac eq ck u ce e acc (3) s s es s t au st ok as then en the tica (M use te AC r ) tasktracker

tasktracker

Map Reduce

Map Reduce

Application

100 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A

Outline 1 2 3 4 5 6 7 8

Introduction Hadoop In practice Complex jobs Applications Tools Installation Alternatives 101 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Outline 1 Introduction 2 Hadoop 3 In practice 4 Complex jobs Joining data sources Job chaining 5 Applications 6 Tools 7 Installation 8 Alternatives

102 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

The Problem I

Join two sources of data regarding a shared key Example I I I

example: extract Netflow records of suspect hosts data sources: netflow records, list of suspect IP addresses shared key: the source IP address

Flows.csv 888,3752921443,2463760020,14,1623,1222173626,357,1222173630,810,22,2295,27,6 889,3752919153,2463760020,14,1623,1222173626,359,1222173630,982,22,3054,27,6 890,3752920950,2463760020,14,1631,1222173626,361,1222173630,830,22,4669,27,6 891,3752921425,2463760020,16,1739,1222173626,364,1222173630,662,22,1715,27,6 892,3752921479,2463760020,14,1623,1222173626,364,1222173630,947,22,1894,27,6 893,3752920950,2463760020,16,1747,1222173626,365,1222173630,794,22,4929,27,6 894,2463760020,3752921531,12,1152,1222173627,237,1222173630,824,1048,22,27,6 895,2463760020,3752921533,12,1152,1222173627,288,1222173630,939,1413,22,27,6 896,3752921531,2463760020,14,2317,1222173627,475,1222173630,824,22,1048,27,6 897,3752921533,2463760020,13,2259,1222173627,521,1222173630,939,22,1413,27,6 898,2463760020,3752980554,13,1204,1222173627,965,1222173630,399,2399,22,27,6 899,2463760020,3752980558,14,1256,1222173628,54,1222173630,418,1966,22,27,6

}

Join

}

I

Suspects.txt 3752920950 3752921533

890,3752920950,2463760020,14,1631,1222173626,361,1222173630,830,22,4669,27,6 893,3752920950,2463760020,16,1747,1222173626,365,1222173630,794,22,4929,27,6 897,3752921533,2463760020,13,2259,1222173627,521,1222173630,939,22,1413,27,6

103 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

DataJoin package I

A specific package: DataJoin I

requirement: add the package to the classpath (hadoop-env.sh) export HADOOP_CLASSPATH="/usr/lib/hadoop/contrib/datajoin /hadoop-datajoin-0.20.2-cdh3u0.jar:$HADOOP_CLASSPATH"

I

I

output of the map: tagged data (depending on the data source) + group keys input of a reducer: all tagged outputs for a single key Mappers

Flows.csv 3752919153,2463760020 3752920950,2463760020 3752921479,2463760020 3752920950,2463760020 3752921533,2463760020

flows, 3752919153, 2463760020 flows, 3752920950, 2463760020 flows, 3752921479, 2463760020 flows, 3752920950, 2463760020 flows, 3752921533, 2463760020

Suspects.txt 3752920950 3752921533

addr, 3752920950 addr, 3752921533

Tag

(Group) Key

Value

{

Reducers

3752919153 flows, 3752919153, 2463760020 3752920950 flows, 3752920950, 2463760020 flows, 3752920950, 2463760020 addr, 3752920950

{

3752921479 flows, 3752921479, 2463760020 3752921533 flows, 3752921533, 2463760020 addr, 3752921533

104 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

DataJoin I

A specific package: datajoin I

reducer: I I

cross-product on each group key with same tag at once combine() function: final output applied on each combination of reduce() → example: copy the record if source IP address is in the suspect host list

3752919153 flows, 3752919153, 2463760020 3752920950 flows, 3752920950, 2463760021 flows, 3752920950, 2463760023 addr, 3752920950

reduce

combine

flows, 3752919153, 2463760020 flows, 3752920950, 2463760021 addr, 3752920950

3752920950, 2463760021

flows, 3752920950, 2463760023 addr, 3752920950

3752920950, 2463760023

105 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Implementation I

Implementation I I

I

public

taggedMapOutput = map output data (key+value) + tag defines the type the data (depends on the conf) + getter + setter/constructor: Text serialization: write(), readFields() + default constructor without argument (for serialization): Hadoop Writable types of tag and data (Text) can be used

static

c l a s s MyTag gedOutp ut e x t e n d s T ag ge dM a pO ut pu t {

p r i v a t e Type data ; p u b l i c MyTa ggedOutp ut ( ) p u b l i c MyTa ggedOutp ut ( Writable val ) p u b l i c Writable getData ( ) p u b l i c v o i d readFields ( DataInput arg0 ) t h r o w s IOException p u b l i c v o i d write ( DataOutput arg0 ) t h r o w s IOException }

106 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Implementation I

Implementation I

taggedMapOutput = map output data (key+value) + tag

p u b l i c s t a t i c c l a s s MyTag gedOutp ut e x t e n d s T ag ge d Ma pO ut p ut { p r i v a t e Text value ; p u b l i c MyTa ggedOutp ut ( ) { value = new Text ( ” ” ) ; } p u b l i c MyTa ggedOutp ut ( Text data , Text tag ) { value = data ; setTag ( tag ) ; } p u b l i c Writable getData ( ) { r e t u r n value ; } p u b l i c v o i d write ( DataOutput out ) t h r o w s IOException { tag . write ( out ) ; value . write ( out ) ; } p u b l i c v o i d readFields ( DataInput in ) t h r o w s IOException { tag . readFields ( in ) ; value . readFields ( in ) ; } }

107 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Implementation I

Mapper implementation 1. generateInputTag() determines the tag(Text) from the file 2. map(): I

I

I

generateTaggedMapOutput() wraps the map output (key,value) in tagged output generateGroupKey() generates the group key from a tagged output

reminder: Strings cannot be compared with ==, Texts cannot be compared with equals()

public

static

c l a s s Map

e x t e n d s DataJoinMapperBase {

p r o t e c t e d Text g e n e r a t e I n p u t T a g ( String inFile ) p r o t e c t e d T ag ge dM a pO ut pu t g e n e r a t e T a g g e d M a p O u t p u t ( Object arg0 ) p r o t e c t e d Text g e n e r a t e G r o u p K e y ( T ag ge dM a pO ut pu t arg0 ) }

108 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Implementation I

Mapper implementation

public

static

c l a s s Map

e x t e n d s DataJoinMapperBase {

p r o t e c t e d Text g e n e r a t e I n p u t T a g ( String inFile ) { i f ( inFile . contains ( ” d a t a ” ) ) r e t u r n new Text ( ” T r a f f i c ” ) ; } e l s e { r e t u r n new Text ( ” S u s p e c t ” ) ; } } p r o t e c t e d T ag ge dM a pO ut pu t g e n e r a t e T a g g e d M a p O u t p u t ( Object arg0 ) ←{ T ag ge d Ma pO ut p ut retv = new MyTag gedOutp ut ( ( Text ) arg0 , ←inputTag ) ; r e t u r n retv ; } p r o t e c t e d Text g e n e r a t e G r o u p K e y ( T ag ge dM a pO ut pu t arg0 ) { i f ( t h i s . inputTag . toString ( ) . equals ( ” T r a f f i c ” ) ) r e t u r n ←new Text ( ( ( Text ) arg0 . getData ( ) ) . toString ( ) . split ( ” , ” ) ←[1]) ; e l s e r e t u r n ( Text ) arg0 . getData ( ) ; } }

109 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Implementation I

Reducer implementation: very easy! → only combine() I I

I I I

public

applied to each group key from a list of tags and values: tags[i] is associated to values[i] tagged output: tag is not important final output: group key, value of tagged output examples: checks the number of tags and output the value corresponding to the record

static

c l a s s Reduce e x t e n d s D a t a J o i n R e d u c e r B a s e

{

p r o t e c t e d T ag ge dM a pO ut pu t combine ( Object [ ] tags , Object [ ] ←values ) { } }

110 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Implementation I

Reducer implementation: very easy! → only combine() I I

I I I

public

applied to each group key from a list of tags and values: tags[i] is associated to values[i] tagged output: tag is not important final output: group key, value of tagged output examples: checks the number of tags and output the value corresponding to the record

static

c l a s s Reduce e x t e n d s D a t a J o i n R e d u c e r B a s e

{

p r o t e c t e d T ag ge dM a pO ut pu t combine ( Object [ ] tags , Object [ ] ←values ) { } }

110 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Implementation Reducer implementation: very easy! → only combine()

I

public

static

c l a s s Reduce e x t e n d s D a t a J o i n R e d u c e r B a s e

{

p r o t e c t e d T ag ge dM a pO ut pu t combine ( Object [ ] tags , Object [ ] ←values ) { i f ( tags . length ==2) { i f ( tags [ 0 ] . toString ( ) . equals ( ” S u s p e c t ” ) ) r e t u r n new ←MyTa ggedOutp ut ( ( Text ) ( ( MyTag gedOutp ut ) values [ 1 ] ) ←. getData ( ) , ( Text ) tags [ 0 ] ) ; else r e t u r n new MyTa ggedOutp ut ( ( Text ) ( ( ←MyTa ggedOutp ut ) values [ 0 ] ) . getData ( ) , ( Text ) tags←[0]) ; } return null ; } }

111 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Implementation I

main/run I

standard definition

112 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Implementation I

main/run I I

standard definition defines the right types: tagged output

F il eI np u tF or ma t . setInputPaths ( job , in ) ; F il eI np u tF or ma t . addInputPath ( job , in2 ) ; F i l e O u t p u t Fo r m at . setOutputPath ( job , out ) ; job . setIn putForm at ( T ex tI np u tF or ma t . c l a s s ) ; job . s et Ou tpu tF or ma t ( T e x t O u t p u t F o r m a t . c l a s s ) ; job . setMa pperCla ss ( Map . c l a s s ) ; job . s et Re duc er Cl as s ( Reduce . c l a s s ) ; job . s e t M a p O u t p u t K e y C l a s s ( Text . c l a s s ) ; job . s e t M a p O u t p u t V a l u e C l a s s ( MyTa ggedOutp ut . c l a s s ) ; job . s e t O u t p u t K e y C l a s s ( Text . c l a s s ) ; job . s e t O u t p u t V a l u e C l a s s ( MyTag gedOutp ut . c l a s s ) ;

112 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Poor performances I

I

nice, flexible and abstracted way to do a join operation but... ...poor performances I I

not good excuses: virtual machine, no cluster... join is only done at the reduce stage I

I

I

output of the mapper is probably highly bigger than really needed output of the mapper has to be sorted and shuffled in the network the join will create much more combinations than necessary

113 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Poor performances I

Solutions: I

I

Join job may be done by the mappers → extra operations for guaranteeing the accessibility of the data A better design: usual join between a big file and multiple small others → the small files can be put on every machines for the mapper (no huge overhead) Mappers

Flows.csv 3752919153,2463760020 3752920950,2463760020 3752921479,2463760020 3752920950,2463760020 3752921533,2463760020

flows, 3752919153, 2463760020 flows, 3752920950, 2463760020 flows, 3752921479, 2463760020 flows, 3752920950, 2463760020 flows, 3752921533, 2463760020

Suspects.txt 3752920950 3752921533

addr, 3752920950 addr, 3752921533

Tag

(Group) Key

Value

114 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Outline 1 Introduction 2 Hadoop 3 In practice 4 Complex jobs Joining data sources Job chaining 5 Applications 6 Tools 7 Installation 8 Alternatives

115 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Simple chaining I

I I

Sequential run of Map Reduce application (easiest design) Worklow: Map1 - Reduce1 - Map2 - Reduce2... One global application: I

I

I

JobClient.runJob(job) waits the end of job before returning → sequential call to JobClient.runJob(job)

Network monitoring use case I I

I

#pkts count per IP address user defines a file with IP addresses related to potential risks: spam, botnet, DoS the #pkts are summed per risk 116 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Simple chaining I I

I

First Map Reduce as previous Out file of the 1st Map Reduce = In file of the second Map Reduce = temporary file → delete at the end using FileSystem Second Map Reduce I

I

I

get the suspect IP addresses via an XML configuration file: job.addResource(new Path(args[2])) define configure(JobConf) in mapper for retrieving the properties one reducer per risk

117 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Simple chaining

public static class Reduce {

Config file

IP_botnet 3709183202,3709183222,3709183228 adresses of botnet ...

public void reduce() throws IOException { //sum of pkts per IP address } public static class MapperAttacker { public void configure(JobConf job) { //Read the config file with suspect IP addresses }

public void map() throws IOException { for each IP address, if it s a suspect one, add packets to the corresponding attack } } public int run(String[] args) throws Exception {

Job 1

JobConf job = new JobConf(conf, IPCountPktsAttacksChain.class); job.setMapperClass(FieldSelectionMapReduce.class); job.setReducerClass(Reduce.class); FileOutputFormat.setOutputPath(job, new Path(myTempPath)); JobClient.runJob(job);

Additional properties for the job configuration

Chaining

JobConf job2 = new JobConf(conf, IPCountPktsAttacksChain.class); job2.addResource(new Path(args[2])); FileInputFormat.setInputPaths(job, new Path(myTempPath));

Job 2 Predefined Mapper and Reducer

job2.setMapperClass(MapperAttacker.class); job2.setReducerClass(LongSumReducer.class); JobClient.runJob(job2); fs.delete(new Path(myTempPath),true); }

HDFS cleaning

118 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Dependent jobs I

I

I

I

I

I

Multiple jobs with dependency → JobControl no direct execution as Map Reduce 1 JobClient.runJob(JobConf) needs to create Job from Map Reduce 1 JobConf: new Job(JobConf) Complex chaining add jobs: JobControl.addJob(Job) add dependency: job2.addDependingJob(job1)

Simple chaining Map Reduce 2

Map Reduce 3

Map Reduce 2

Map Reduce 6

Map Reduce 3 Map Reduce 5 Map Reduce 4

Parralel runs

JobControl orders the jobs, implements Runnable (thread based design) and can be monitored (running/failed jobs...) 119 / 242

Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining

Dependent jobs I

Example I

I

I

I

I

count the number of packets per suspect address → one out file per category (spam, DoS, botnets) define a function for setting up the jobs (most of them share the same parameters) monitor the global execution by displaying regularly the names of running jobs single machine: hard to see parallel jobs (increase the number of reduce tasks)

Java (recalls): I

generic type as function parameter: Class