[email protected]
NOMS 04/20/2012
Cloud computing for Scalable Network Management http://lorre.uni.lu/~jerome/files/tutoNOMS2012.pdf
J´erˆome Fran¸cois This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 2.5 (http://creativecommons.org/licenses/by-nc-sa/2.5/, except screenshots from applications and when mentioned)
Introduction Hadoop In practice Complex jobs Applications Tools Installation A
Outline 1 2 3 4 5 6 7 8
Introduction Hadoop In practice Complex jobs Applications Tools Installation Alternatives 2 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A
Outline 1 2 3 4 5 6 7 8
Introduction Hadoop In practice Complex jobs Applications Tools Installation Alternatives 3 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Cloud computing ? The tutorial
Outline 1 Introduction Cloud computing ? The tutorial 2 Hadoop 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 8 Alternatives
4 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Cloud computing ? The tutorial
Definition of cloud computing I
Wikipedia definition: Cloud computing refers to the provision of computational resources on demand via a computer network, such as applications, databases, file services, email, etc
I
Twenty-One Experts Define Cloud Computing, Jan. 09, http://cloudcomputing.sys-con.com/node/612375 I I
scalability on demand (within minutes or seconds) cost aspects: I I
I I I I
avoid huge investments (infrastructure, training...) outsourced and pay-as-you-go service
web-based applications virtualization: SaaS, PaaS, IaaS easy deployment and configuration / ∼ grid but crash-proof allow people to access (massively) technology-enabled services → all Internet applications 5 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Cloud computing ? The tutorial
Definition of cloud computing I
Cloud computing and Grid Computing 360-Degree-Compared, I. Fost et. al GCE ’08 why ?
A large-scale distributed computing paradigm that is driven by economies of scale, in which a pool of abstracted, virtualized, dynamically-scalable, managed computing power, storage, platforms, and services are delivered on demand to external customers over the Internet.
within minutes 6 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Cloud computing ? The tutorial
Reinvent the wheel ? I
Differences with distributed computing / grid-computing I I
I
I
elasticity: resources are available “instantaneously” the cloud may be more open (public cloud) 6= Grid computing ∼ private or community network → easy-to use
Scalability shift from power to storage resources
7 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Cloud computing ? The tutorial
Outline 1 Introduction Cloud computing ? The tutorial 2 Hadoop 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 8 Alternatives
8 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Cloud computing ? The tutorial
Objective of the tutorial I I
Explain MapReduce paradigm Introduce Hadoop for running MapReduce job I I I
I
I I I
I
components data flows configuration
Network management data intensive problem resolution Practical examples Alternatives installation and management of cloud architectures... but some basics to build up your own cluster or use Amazon Web Service handling cloud related issues: security, reliability... 9 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Cloud computing ? The tutorial
Materials I
My inspiration: I
official websites: I I I I
I
http://hadoop.apache.org/ http://www.cloudera.com/ http://pig.apache.org/ http://hbase.apache.org/
books: I I I
Hadoop in Action, Chuck Lam, Manning Publications Pro Hadoop, Jason Venner, Apress Hadoop: The Definitive Guide, Tom White, O’Reilly Yahoo Press
10 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A
Outline 1 2 3 4 5 6 7 8
Introduction Hadoop In practice Complex jobs Applications Tools Installation Alternatives 11 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Outline 1 Introduction 2 Hadoop Overview MapReduce HDFS Components Communications Reliability 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 12 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
What it is?
I I I I I
Open-source framework Distributed applications for processing large data files Easy to use (and deploy) → no need to be an expert Robust: assumes that there are malfunctionning Popular (http://wiki.apache.org/hadoop/PoweredBy): I I I
non expert / small companies universities large companies: FaceBook, Google, Ebay, Twitter... 13 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Data-intensive processing I
Large data files handling = data-intensive processing I I
I
6= computational intensive work small process on many pieces of information contained in data files another way to distribute the work: I I I I
large data are usually already stored on clusters the code is transferred where the data is the code is executed where the data is no data transfer → avoid network overhead
14 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Data-intensive processing I
Large data files handling = data-intensive processing I I
I
6= computational intensive work small process on many pieces of information contained in data files another way to distribute the work: I I I I
large data are usually already stored on clusters the code is transferred where the data is the code is executed where the data is no data transfer → avoid network overhead
14 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Data-intensive processing I
Large data files handling = data-intensive processing I I
I
6= computational intensive work small process on many pieces of information contained in data files another way to distribute the work: I I I I
large data are usually already stored on clusters the code is transferred where the data is the code is executed where the data is no data transfer → avoid network overhead
14 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Data-intensive processing Network data volumes increase I
Cisco measured and forecasted Internet traffic (1000 PB/day) 60000 50000
PB/month
I
40000 30000 20000 10000 0
14
12
20
10
20
08
20
06
20
04
20
02
20
00
20
98
20
96
19
94
19
19
Year I
0.7 TB/day in Luxembourg 15 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Scale-Up vs Scale-Out I
Scale-up I I
I
Scale-out I I
I
add more resources on a single machines very expansive: high performance for I/O → SSD, SAS, SATA 3.0 (hard drives + controllers) add more machines with less resources less expansive
example: read 10TB Channel througput Scaling Time
High-end server 200MB/s x2 channels 7:00
Common machine 50 MB/s x20 machines 2:45 16 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
History I I
I
I
I
2004: Google File System and Map-Reduce Implementation of MapReduce for Nutch (Apache project): open-source search engine → Hadoop is created by Doug Cutting 2006: Doug Cutting hired by Yahoo! for working on Hadoop 2008: 10,000+core cluster in production for Yahoo! search engine 2009: Amazon Elastic MapReduce is released and uses Hadoop 17 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Outline 1 Introduction 2 Hadoop Overview MapReduce HDFS Components Communications Reliability 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 18 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Approach I
Data processing model with two key components: I I
I
the mappers execute the programs on data subsets the reducers aggregate the results from the mappers
toy example: wordcount, IPAddressCount I
count the number of flows per source IP address
IP Src 1.1.1.1 3.3.3.3 3.3.3.3 4.4.4.4 2.2.2.2 1.1.1.1
IP Dst 2.2.2.2 2.2.2.2 4.4.4.4 5.5.5.5 4.4.4.4 2.2.2.2
IP Src 1.1.1.1 2.2.2.2 3.3.3.3 4.4.4.4
Count 2 1 2 1 19 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Src IP address count I
One loop over the lines + a hash table for each li in lines: src_IP = get_src_IP(li) IP_count[src_IP]++ end for
I
//Tokenization
multiple files: 1 loop over files + 1 loop over the lines + a hash table for each f in files: for each li in lines(f): src_IP = get_src_IP(li) IP_count[src_IP]++ end for end for
//Tokenization
20 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Map-Reduce and Src IP address count I
Multiple input files/file subsets Mappers
IP Src File 1 1.1.1.1 3.3.3.3 File 2 3.3.3.3 4.4.4.4 2.2.2.2 File 3 1.1.1.1 I
IP Dst 2.2.2.2 2.2.2.2 4.4.4.4 5.5.5.5 4.4.4.4 2.2.2.2
1.1.1.1,1 3.3.3.3,2 4.4.4.4,1 2.2.2.2,1 1.1.1.1,1
Reducer
IP Src 1.1.1.1 2.2.2.2 3.3.3.3 4.4.4.4
Count 2 1 2 1
Issues: I
I I
file transfer overhead → the mapper program is transferred where the data is 1 reduce = 1 single point of failure each mapper has to memorize the hashtable before sending it 21 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Map-Reduce and Src IP address count I
Pseudo-code of Mappers: for each li in lines(f): src_IP = get_src_IP(li) //Tokenization IP_count[src_IP]++ //IP_count may be big in memory end for send(IP_count) //Send to the reducer
I
Pseudo-code of the reducer: for each received_IP_count: for each key in received_IP_count IP_count[key] = IP_count[key] + received_IP_count end for end for 22 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Mapper optimization I
Don’t wait the end of the process for sending the results I
I
for each parsed IP address, send the information with a count = 1 the reducer is already affected to the aggregation task Mappers
IP Src File 1 1.1.1.1 3.3.3.3 File 2 3.3.3.3 4.4.4.4 2.2.2.2 File 3 1.1.1.1
IP Dst 2.2.2.2 2.2.2.2 4.4.4.4 5.5.5.5 4.4.4.4 2.2.2.2
1.1.1.1,1 3.3.3.3,1 3.3.3.3,1 4.4.4.4,1 2.2.2.2,1 1.1.1.1,1
Reducer
IP Src 1.1.1.1 2.2.2.2 3.3.3.3 4.4.4.4
Count 2 1 2 1
23 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Map-Reduce and Src IP address count I
Pseudo-code of Mappers: for each li in lines(f): src_IP = get_src_IP(li) send([src_IP,1]) end for
I
//Tokenization
Pseudo-code of the reducer = the same: for each received_IP_count: for each key in received_IP_count IP_count[key] = IP_count[key] + received_IP_count end for end for 24 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Multiple reducers I
Process the reducing task in parallel I
partition the output of the Mappers 1. 1.1.1.1, 2.2.2.2 2. 3.3.3.3, 4.4.4.4
I
I
assuming an IP address, all counts have to be sent to the same reducer (shuffle) multiple outputs with disjoint results Mappers
IP Src File 1 1.1.1.1 3.3.3.3 File 2 3.3.3.3 4.4.4.4 2.2.2.2 File 3 1.1.1.1
IP Dst 2.2.2.2 2.2.2.2 4.4.4.4 5.5.5.5 4.4.4.4 2.2.2.2
Reducers IP Src Count 1.1.1.1 2 2.2.2.2 1
1.1.1.1,1 3.3.3.3,1 3.3.3.3,1 4.4.4.4,1 2.2.2.2,1 1.1.1.1,1
IP Src Count 3.3.3.3 2 4.4.4.4 1 Shuffle
25 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Multiple reducers I
Pseudo-code of Mappers (reducer of function is the result of partitioning): for each li in lines(f): src_IP = get_src_IP(li,reducer_of(li)) send([src_IP,1]) end for
I
//Tokenization
Pseudo-code of the reducer = the same: for each received_IP_count: for each key in received_IP_count IP_count[key] = IP_count[key] + received_IP_count end for end for
26 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Formal Map-Reduce I
Map-Reduce I I I
a set of mappers: Mappers a set of reducers: Reducers approach based on hkey , valuei pairs: I
I I I
map input: hk1, v 1i (k1: line number, filename... but rarely used for further usage) intermediate between mappers and reducers: hk2, v 2i reduce output: hk3, v 3i
partitioner: k2 → Reducers
list() Line numbers (,... )
list()
,... )
Shuffle
Mappers
Reducers list()
(, ...)
27 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Formal Map-Reduce I
∀k2, the values v 2 are aggregated to a single list I I
network overhead reduction hk2, v 2i are sorted by keys (then easy to aggregate): I I
I
partial sort on the mapper machine final sort on the reducer machine
∼ hash table on mapper (one prior issue) I I
I
managed by the (Hadoop) framework optimizations already build-in (buffers, mix between hard drive and in-memory storage) final user/developer has not to take care about it
Mappers list() list()
(, ) ()
Sort and Shuffle
k2,v20 k2,v21 ... k2',v2'0 k2',v2'1
} }
(, (, )
Aggregate
Reducers list()
(, ...)
28 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
The Black-Box I
I
Hadoop = MapReduce framework with many abstractions Easy to use for novices and enough advanced for experts I
I
programming is limited to the core of the mappers and reducers = specific process regarding the application the “Black-Box” may be fine-tuned by configurations and programming Mappers k2,v2
Multiple files Line numbers
To program
Sort Shuffle Aggregate
Reducers
k3,v3
{
k1,v1
{
list(k1,v1)
To program
list(k3,v3) Multiple files
29 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Outline 1 Introduction 2 Hadoop Overview MapReduce HDFS Components Communications Reliability 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 30 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
The Need of a Specific File System I I
HDFS: Hadoop Distributed File System Why a specific file system I I
I
paradigm shift: send the code to the data → data has to be previously distributed
A file in HDFS is broken in multiple blocks I I
the mapper processes the blocks they have blocks are maintained to keep redundancy (3 copies) Mappers
1 file
{
Block 1 Block 2 Block 3
IP Src File 1 1.1.1.1 3.3.3.3 File 2 3.3.3.3 4.4.4.4 2.2.2.2 File 3 1.1.1.1
IP Dst 2.2.2.2 Native FS 2.2.2.2 Native FS 4.4.4.4 5.5.5.5 4.4.4.4 Native F S 2.2.2.2
31 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Files, Blocks and Splits I
Blocks and splits I I I
I
a block is a physical division (64 MB) a split is a logical division they are not necessarily aligned → a line of a file may be split over two blocks
Using multiple files is still possible I I
each file will be split by blocks Hadoop is more efficient with bigger files I I
manage easily 100 TB or more in a single file abstract the handling of large files to the user
32 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Outline 1 Introduction 2 Hadoop Overview MapReduce HDFS Components Communications Reliability 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 33 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
HDFS I
NameNode = metadata of the HDFS I I
I
manage replication no transfer through this node → avoid overhead
Datanodes on slave machines: I I
file blocks I/O continuously reporting to Namenode
(3) Block I/O request
Master node 1 (1) Send Data Get Results
User
(2) Blocks/Datanode to request
Slave node
datanode
namenode
Slave node
datanode
local file system
block division maintaining
(2') Replication request
...
Slave node
datanode
local file system
HDFS
local file system
(3') Data transfert (4') Report
(1') Report
// 34 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Map-Reduce I
Jobtracker: I I
I
assigns the tasks monitors the tasks and reassignment if failure
Tasktracker: I I I
executes locally the tasks reports to the jobtracker outputs the map results to the reducers Slave node
Master node 2
User
Submit job
jobtracker Task assignements and monitoring
Assign tasks
Slave node
...
Slave node
tasktracker
tasktracker
tasktracker
Map Reduce
Map Reduce
Map Reduce
Application
Map output sent to hash(key)
Hearthbeat
35 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Global architecture I
Namenode: a single point of failure secondary Namenode for taking metadata snapshot manual intervention to recover
I I
Unique master Backup node
Sn
Secondary namenode HDFS metadata snapshot
ta Da lts nd su Se t Re Ge User
Su
bm
it
Master node 1 ap
sh
Slave node
Slave node
...
Slave node
ot
namenode block division maintaining
datanode local file system
datanode local file system
datanode
HDFS
local file system
Master node 2
Local I/O
Local I/O
Local I/O
jobtracker
tasktracker
tasktracker
tasktracker
Map Reduce
Map Reduce
Map Reduce
job
Task assignements and monitoring
Application
36 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Outline 1 Introduction 2 Hadoop Overview MapReduce HDFS Components Communications Reliability 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 37 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Write data I
Node to node communications I I
I I
replication → creation of a pipeline the client splits the file and requests Namenode for each individual blocks of 64MB (steps 1 and 3 are multiple) Namenode provides a list of datanodes to the client acknowledgment mechanism to wait that all datanodes are ready (3) Write request ? + DN 3,4
Master node 1
User computer Local filesystem
Myfile.txt Block1 Block2 Block3
(1) Write request: Myfile.txt: blocks 1,2,3 (2) Write reply Block1: DN 1,3 4 Block2: DN 2,3,4 Block3: DN 1,2,4
namenode block division maintaining
(8) Write Ack (OK)
(4) Write request ? + DN 4
Slave node
...
(5) Write request ?
Slave node
...
Slave node
datanode 1
datanode 3
datanode 4
local file system
local file system
local file system
(7) Write Ack (OK)
(6) Write Ack (OK)
38 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Write data I
Writing data I I
I
data is transmitted through the pipeline the “reverse” pipeline is used to propagate acknowledgement of written data each datanode the client reports to the Namenode Local filesystem
datanode 1
Myfile.txt
datanode 3
datanode 4
Data pipeline
Block1 Block2 Block3
namenode
Local filesystem
datanode 1/2/1
datanode 3/3/2
datanode 4/4/4
Block1 Ack Block1 Block2 Ack Block2 Block3 Ack Block3
39 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Write data I
Writing data Local filesystem
namenode Block1
datanode 1
datanode 3
datanode 4
Write request ? (pipeline init) Write Ack
Data packet 1 Ack data packet1 Ack data packet2 Ack data packet3
Data packet 2 Data packet 1
... Write end (pipelin e close) Write end ack
write success
40 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Read data I
Read data I
I
locate data: Namenode provides a list of locations for each data block get one replica of each block (first location in the list) by requesting datanodes directly (7) Read MyFile.txt block3 (5) Read MyFile.txt block2
Slave node
(3) Read MyFile.txt block1
datanode 3 Master node 1
User computer Local filesystem
Slave node
Slave node
(1) read request: Myfile.txt:
local file system Block1 Block2
(2) Read reply Block1: DN 1,3 4 Block2: DN 2,3,4 Block3: DN 1,2,4
datanode 1
namenode block division maintaining
local file system Block1 Block3
(4) Data MyFile.txt block1 (6) Data MyFile.txt block2 (8) Data MyFile.txt block3
datanode 2 local file system Block2 Block3
Slave node
datanode 4 local file system Block1 Block2 Block3
41 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
MapReduce I
Map I I I
user submit a job jobtracker requests tasktrackers to map on local data tasktracker reports to the jobtracker Master node 1
(3) Block1: DN 1,3 4 Block2: DN 2,3,4 Block3: DN 1,2,4
Slave node
namenode (2) Myfile.txt: Block location ?
datanode 1
datanode 2
local file system
local file system
block division maintaining Block1
Local filesystem
(1) Submit job: Work on Myfile.txt:
(5') Progress report
...
Slave node
datanode 4 local file system Block1
Block2 Block3
Block3
User computer
Slave node
Block2 Block3
Master node 2
Local I/O
Local I/O
Local I/O
jobtracker
tasktracker
tasktracker
tasktracker
Map Reduce
Map Reduce
Map Reduce
Task assignements and monitoring (4) Map on block1
(4) Map on block2 (4) Map on block3 (5) Progress report
42 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
MapReduce Reduce Master node 1
namenode
Slave node
datanode 1
datanode 2
local file system
local file system
block division maintaining Block1
Local filesystem
(1) Submit job: Work on Myfile.txt:
(7') Progress report
...
Slave node
datanode 4 local file system Block1
Block2 Block3
Block3
User computer
Slave node
Block2 Block3
Master node 2
Local I/O
Local I/O
Local I/O
jobtracker
tasktracker
tasktracker
tasktracker
Map Reduce
Map Reduce
Map Reduce
Task assignements and monitoring
(8) HDFS write
I
(6) Output to reduce
(7) Progress report
43 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Outline 1 Introduction 2 Hadoop Overview MapReduce HDFS Components Communications Reliability 3 In practice 4 Complex jobs 5 Applications 6 Tools 7 Installation 44 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Issues I
Availibility of data I I
I
I
replication... ...ok but how to make it efficient ?
Efficient task execution → how to select the task trackers How to hande failures: I
I
Disk IO, communications, failures in process, non responding nodes... TCP based Hadoop communications is not sufficient
45 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Block allocation Avoid data loss → optimized replica locations I
I
I
→ rack awareness: I
I
I
I
replication consumes disk ressources but also network ressources → place replica in close locations replication should also protect against a global rack failures
1st and 3rd replicas in the same rack 2nd replica in a different rack other replicas in random racks
manual configuration (dfs.network.script)
Switch
Rack 1 Switch High performances
I
datanode 1 Block1 Block2
datanode 2
Rack 2 Switch datanode 4 Block1
datanode 5
Block2
datanode 3 Block1
Rack 3 Switch datanode 7 datanode 8 Block2
datanode 6 Rack failure protection
46 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Heartbeat I
I
block division maintaining
2) (DN ck2 ck3 Blo Blo 1, op y C k c (3) Blo ) (1 Slave node
datanode 1
Slave node
datanode 2
local file system
local file system Block2 Block3
Block1 Block3 Block1 (4) Stream
Block2
(2) Metadata Block1: DN 1,3 4 Block2: DN 2,3,4 Block3: DN 1,2,4
(1)
Blo
ck
)
I
HDFS read to closest datanodes distance between nodes = sum of distances to their common ancestors
namenode
(3
I
Master node 1
1, ck lo k2 ) B loc B
→ requests datanodes to copy underreplicated blocks (piggybacking) locality awareness
(1
I
TCP heartbeat of datanodes every 3 seconds Block report every 10 heartbeat → namenode has a global view about data replication
(3 ) (1 )B Bl loc oc k k3 2,
I
(4) Stream
Slave node
datanode 3
1,
Blo
ck2
,b lo
ck3
Slave node
datanode 4
local file system Block1
local file system Block1
Block2
Block2 Block3
Block3
47 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
HDFS pipeline I
Acknowledgment mechanism I
I I
I
I
two counters per node: number of bytes received + number of bytes acknowledged by the downstream datanode ACK sent only when ACK from downstream is received → consistency = data is visible (ack to namenode) when all namenodes have it failure → reconstruct pipeline with remaining nodes + generation timestamp
Pipeline creation failure I I I I
no ack / error ack = failure → propagate information in the “reverse” pipeline ready datanodes close the block file and TCP connections client requests namenode for another set of datanodes (append: replace only failed datanodes) 48 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Task assignation I I I
I
I
Heartbeat → HDFS meta data up-to-date + available nodes Fixed numbers of slots for map and reduce per node Select mappers → locality awareness I select a local mapper for each block if avalaible I a closer one else (rack awareness in priority) Select reducers (no locality) I ∼ random selection I next available reduce slots task failure = exception thrown by the JVM or timeout I reassign the task by avoiding to select a node that has failed previously I too many failures (mapred.[map|reduce].max.attempts) → stop the tasl I useful partial results → allow a level of failure (mapred.max.[map|reduce].failures.percent) 49 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Job scheduling I
Default scheduler I I
I
FIFO + priority can be assigned to job but no preemption not suited for multi user deployment
Fair scheduler (developed by Facebook)
jobs into pools (∼ users) with conditions on slot allocation performance → take all available slots I fair → preemption (task kill) to readjust slot usage regarding the configuration I
I
5 5 2.0 6 3
50 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Job scheduling
I
Capacity scheduler (developed by Yahoo) I I I I
jobs are allocated to queues each queue has a configurable number of slots remaining slots are equally divided into queues each queue is managed with FIFO + priorities
51 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Secondary namenode I
namenode I
I
volatile memory : snapshot of the current HDFS meta-data → fast query replies + 2 files I I I
I
I
FsImage: a past snapshot EditLog: incremental changes FsImage + EditLog → generation of an up-to-date snaphsot (archive, initialization)
Single point of failure
Secondary namenode I
1st objective: namenode recovery, performance improvement I I I
huge EditLog on busy clusters primary namenode busy with real time queries secondary namenode delegated to create snapshots 52 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Secondary namenode I
recovery = replace namenode by secondary namenode = manual intervention I I I
FsImage + EditLog in dfs.name.dir directory property dfs.name.dir not used by the secondary namenode → copy namenode files to secondary namenode (dfs.name.dir can specify multiple directories for this purpose) via a mounting point
dfs.name.dir localdir,remotedir I
last step: change the hostname / IP address (or reroute traffic) of the secondary namenode to play the role of the primary namenode (datanodes cache the IP address!) 53 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Overview MapReduce HDFS Components Communications Reliability
Other mechanisms I
Concurrent read and write issues: I
I
I
I
write → obtain and renew a lease for preventing concurrent writing heartbeat mechanism: preemption of the lease after inactivity, file closure after a long period (1 hour) of inactivity concurrent reads is allowed
Corrupted file blocks I I I
1 data block → 1 checksum read = (block + checksum) → verification by the client if error → take another replica and inform the datanode to get a valid replica 54 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A
Outline 1 2 3 4 5 6 7 8
Introduction Hadoop In practice Complex jobs Applications Tools Installation Alternatives 55 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Outline 1 Introduction 2 Hadoop 3 In practice Environment HDFS MapReduce Programming Customization Streaming Security 4 Complex jobs 5 Applications 6 Tools 7 Installation 56 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Installation I
Different installation mode: I
I
I
I
fully distributed: master machine + backup machine + slave machines (for production) standalone: single machine, no Hadoop Daemon (for development/debugging purposes) pseudo distributed: a single machine with the different daemon (for development/debugging purposes, memory usage, interaction)
Cloudera I I I
http://www.cloudera.com/ Hadoop-based services Cloudera’s Distribution including Apache Hadoop (CDH): Hadoop + plug-ins + packaging → easy-to-[install|use|deploy] 57 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Outline 1 Introduction 2 Hadoop 3 In practice Environment HDFS MapReduce Programming Customization Streaming Security 4 Complex jobs 5 Applications 6 Tools 7 Installation 58 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Program Workflow Write data
Analysis might be intensive
I
I I
I
Analysis
...
Read results Read results
Write data ∼ upload data I
I
Read results
Analysis
can be network intensive multiple analysis for the same data incremental updates
read / write = I/O on HDFS are essential HDFS may not be accessed using standard UNIX commands → hadoop fs -cmd 59 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Example I
core-site.xml: fs.default.name hdfs://localhost:8020
I
ls Example I I
hadoop fs -ls hadoop fs -ls hdfs://localhost:8020/
60 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Example I
core-site.xml: fs.default.name hdfs://localhost:8020
I
ls Example I I
hadoop fs -ls hadoop fs -ls hdfs://localhost:8020/ No replication for directory (= structure) !
Replication level = 1
bydefault: user directory
60 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Prepare data I
hadoop fs, hadoop fs -help ls → Goal display sample.txt get a local copy of sample.txt remove sample.txt create a directory for data copy flows100000.csv into the data directory
I
Cmd
sample file extracted from the dataset provided in Sperotto, Anna and Sadre, Ramin and Vliet van, Frank and Pras, Aiko (2009) A Labeled Data Set For Flow-based Intrusion Detection. In: IP Operations and Management. Lecture Notes in Computer Science 5843, pp. 39-50. ISBN 9783642049675
I
each line is a record: id, src ip, dst ip, #packets, #bytes, start time, start msec, end time, end msec, src port, dst port, tcp flags, proto
61 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Prepare data I
hadoop fs, hadoop fs -help ls → Goal display sample.txt get a local copy of sample.txt remove sample.txt create a directory for data copy flows100000.csv into the data directory
I
Cmd hadoop fs -cat sample.txt -get sample.txt localSample.txt -rm sample.txt -mkdir data -put flows1000000.csv data
sample file extracted from the dataset provided in Sperotto, Anna and Sadre, Ramin and Vliet van, Frank and Pras, Aiko (2009) A Labeled Data Set For Flow-based Intrusion Detection. In: IP Operations and Management. Lecture Notes in Computer Science 5843, pp. 39-50. ISBN 9783642049675
I
each line is a record: id, src ip, dst ip, #packets, #bytes, start time, start msec, end time, end msec, src port, dst port, tcp flags, proto
61 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Monitoring I
Web GUI: localhost:50070
I
status/health of HDFS free space HDFS browsing
I I
62 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Monitoring I
Web GUI: localhost:50070
I
status/health of HDFS free space HDFS browsing
I I
data size on DFS free physical space failures safe removing
62 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Outline 1 Introduction 2 Hadoop 3 In practice Environment HDFS MapReduce Programming Customization Streaming Security 4 Complex jobs 5 Applications 6 Tools 7 Installation 63 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Types I
Key and values have specific types (serialization requirements): I I I I
I
package org.apache.hadoop.io value types implement Writable key type WritableComparable default type (WritableComparable) are wrappers for standard types: [Boolean|Byte|Double|Float|Integer|Long]Writable, Text, NullWritable defining custom types is possible I I
Writable → readFields, write WritableComparable → + compareTo
64 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Types I
The MapReduce program has to read and Write on the HDFS → specific format I
input → InputFormat
Class TextInputFormat
Record each line
NLineInputFormat
similar to NLineInline offset (Text) putFormat where each split is exaclty N lines defined by mapred.line. input.format.linespermap (LongWritable) each line before the sepafter the separator arator defined by (Text) key.value.separator.in. input.line (default \t) (Text) each line 10 first chars (Text) the rest of the line (Text) 65 / 242
KeyValueTextInputFormat
TeraInputFormat
Key line offset
Value line content (LongWritable) (Text) line content
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Types I
The MapReduce program has to read and Write on the HDFS → specific format I
output → OutputFormat
Class TextOutputFormat SequenceFileInputormat TeraOutputFormat NullOutputFormat
Description 1 record (Key tab value) per line (separator is defined by textoutputformat.separator ) Specific format for passing binary data through multiple MapReduce program 1 record (KeyValue) per line no output
66 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Code Skeleton Mappers k2,v2
Multiple files Line numbers
To program
Sort Shuffle Aggregate
Reducers
k3,v3
list(k3,v3)
{
k1,v1
{
list(k1,v1)
To program
Multiple files
public class MyMapReduce { public static class Map extends MapReduceBase implements Mapper { public void map(K1 key, V1 value, OutputCollector output, Reporter reporter) throws IOException { } }
Genericit Interfaces
public static class Reduce extends MapReduceBase implements Reducer { public void reduce(K2 key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { } }
67 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Code Skeleton I
OutputCollector is a wrapper for managing output I
I
I
output.collect(key,value)
Reporter is used to send status message along the process (advanced programming) recent stable release update (this code may not work in the future) I I
0.20 still compatible no compatibility in the future I I
I
OutputCollector → Context package structure changes
a main is required! 68 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Code skeleton public class MyMapReduce { public static class Map extends MapReduceBase implements Mapper{ } public static class Reduce extends MapReduceBase implements Reducer { } Has to be coherent public static void main(String[] args) throws Exception { JobConf conf = new JobConf(MyMapReduce.class); conf.setJobName("MyMapReduceName"); conf.setInputFormat(InputFormat.class); conf.setOutputFormat(OutputFormat.class); conf.setOutputKeyClass(K3.class); conf.setOutputValueClass(V3.class);
Define input and output format
Define output key and value type
conf.setMapperClass(Map.class); Define mappers and reducers conf.setReducerClass(Reduce.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); Define the input data and where FileOutputFormat.setOutputPath(conf, new Path(args[1])); to store the output JobClient.runJob(conf); RUN ! }
69 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Code skeleton I
Output types of the mapper can also be defined I I I
I
by default: same as reducer key: job.setMapOutputKeyClass() value: job.setMapOutputValueClass()
Don’t forget to include packages: I I
old/current API: mapred package new API: mapreduce package New API
Old API import java.io.IOException; import java.util.*; import import import import import
org.apache.hadoop.fs.Path; org.apache.hadoop.conf.*; org.apache.hadoop.io.*; org.apache.hadoop.mapred.*; org.apache.hadoop.util.*;
import java.io.IOException; import java.util.*; import java.util.regex.Pattern; import import import import import import import
org.apache.hadoop.fs.Path; org.apache.hadoop.conf.*; org.apache.hadoop.io.*; org.apache.hadoop.mapreduce.*; org.apache.hadoop.mapreduce.lib.input.*; org.apache.hadoop.mapreduce.lib.output.*; org.apache.hadoop.util.*;
70 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
IPCount public static class Map extends MapReduceBase implements Mapper { private final static IntWritable one = new IntWritable(1); private LongWritable ip = new LongWritable(); public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); Pattern regExpPattern = Pattern.compile(","); String items[] = regExpPattern.split(line); ip = new LongWritable(Long.parseLong(items[1])); output.collect(ip, one); } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(LongWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }
71 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
IPCount public static void main(String[] args) throws Exception { JobConf conf = new JobConf(IPCount.class); conf.setJobName("MyMapReduce");
conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); }
72 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Compile and Run 1. compile I I
eclipse: automatic build or click terminal: javac -classpath /usr/lib/hadoop-0.20/ hadoop-0.20.2-cdh3u0-core.jar path/IPCount.java
2. create a jar I I
eclipse: click, click and click terminal: jar -cvf IPCount.jar -C DirectoryWithClasses DirectoryInTheJar
3. execute: hadoop jar IPCount.jar YourClassWithMain data out 4. look at the results in out 73 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
JobTracker I I
Web GUI: localhost:50030 status of the job, tasks (progress, failures)
74 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Code skeleton with the New API public class MyMapReduceNewAPI {
genericity: context gets the type for the class
public static class Map extends Mapper { public void map(K1, V1, Context context) throws IOException, InterruptedException {} } genericity: context gets the type for the class public static class Reduce extends Reducer { public void reduce(K2, Iterator values, Context context) throws IOException, InterruptedException {} } public static void main(String[] args) throws Exception { JobConf divided between Job and Configuration Configuration conf = new Configuration(); Job job = new Job(conf,"Couting IP addr with the new API"); job.setJarByClass(MyMapReduceNewAPI.class); job.setInputFormatClass(InputFormat.class); job.setOutputFormatClass(OutputFormat.class); job.setOutputKeyClass(K3.class); job.setOutputValueClass(V3.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //job.submit(); job.waitForCompletion(true); } }
75 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Configuration I
configuration files are defined by XML: PropName PropValue PropDesc
I
JobConf / Configuration class : I
I
properties of config files can be set/overridden or read: addRessource(XMLfile), set(name,value), get(name) runtime properties can be set: I
I
setNumMapTasks(n): only a hint, it is derived from the split division (logical division of blocks, not exactly aligned) → mapred.min.split.size property in mapred-default.xml setNumReduceTasks(n), setNumMapTasks(n) 76 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Fair scheduler Installation I
contrib module I
I
I
HADOOP HOME/contrib/fairscheduler (/usr/lib/hadoop/contrib/fairscheduler/hadoopfairscheduler-0.20.2-cdh3u0.jar) add the jar file to default lib (HADOOP HOME/contrib) to load or modify the classpath (HADOOP CLASSPATH)
mapred-site.xml mapred.jobtracker.taskScheduler org.apache.hadoop.mapred.FairScheduler
I
default pool name = user mapred.fairscheduler.poolnameproperty user.name
77 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Fair scheduler Installation I I
Web interface: http://localhost:50030/scheduler
78 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Outline 1 Introduction 2 Hadoop 3 In practice Environment HDFS MapReduce Programming Customization Streaming Security 4 Complex jobs 5 Applications 6 Tools 7 Installation 79 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Combiner I
IPCount: collects a lot of 1 → network overload Do a local aggregation I
I
I
Mappers + combiners k2,sublist(v2) k2,v2 k1,v1
{
list(k1,v1)
combiner: local reducer on the same machine doing a map → implements Reducer the results has to be the same with or without instantiating the combiner 6= example with hashtable, the combiner does not wait all results (buffered)
Multiple files Line numbers
I
To program
Sort Shuffle Aggregate
Reducers
k3,v3
{
I
To program
list(k3,v3) Multiple files
a combiner with IPCount →conf.setCombinerClass(Reduce.class) 80 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Easy configuration I
GenericOptionsParser I I
I
I
command line argument interpretation configuration: file, namenode (can be remote), jobtracker (can be remote), common files to deploy... custom properties
the Tool interface I I I I
I
handle mix between custom and standard arguments used with Configured to implement basic getter and setter ToolRunner uses GenericOptionParser and Tool the javadoc provides the standard way to program a MapReduce application (new API) implementation of the run method (previously in main) 81 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Easy configuration I
(old) API public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, MyMapReduce.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormat(InputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(K3.class); job.setOutputValueClass(V3.class); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new MyMapReduce(), args); System.exit(res); }
82 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Easy configuration I
IPCount with ToolRunner I
I
I I
int res = ToolRunner.run(new Configuration(), new IPCountConfig(), args); change properties with the command line: increase the number of reduce tasks results use the first field of the csv file (an index) as the key for the mapper
83 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Easy configuration I
IPCount with ToolRunner I
I
int res = ToolRunner.run(new Configuration(), new IPCountConfig(), args); change properties with the command line: increase the number of reduce tasks I
I I
hadoop jar IPCount.jar IPCountConfig -D mapred.reduce.tasks=10 data out
results use the first field of the csv file (an index) as the key for the mapper
83 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Easy configuration I
IPCount with ToolRunner I
I
int res = ToolRunner.run(new Configuration(), new IPCountConfig(), args); change properties with the command line: increase the number of reduce tasks I
I I
hadoop jar IPCount.jar IPCountConfig -D mapred.reduce.tasks=10 data out
results → one file per reducer use the first field of the csv file (an index) as the key for the mapper
83 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Easy configuration I
IPCount with ToolRunner I
I
int res = ToolRunner.run(new Configuration(), new IPCountConfig(), args); change properties with the command line: increase the number of reduce tasks I
I I
hadoop jar IPCount.jar IPCountConfig -D mapred.reduce.tasks=10 data out
results → one file per reducer use the first field of the csv file (an index) as the key for the mapper I I
I
input as KeyValueTextInputFormat → do the split... ...but use specific types → change the type for the mapper definition (Mapper < Text, Text, LongWritable, IntWritable >) hadoop jar IPCount.jar IPCountKeyInput -D mapred.reduce.tasks=1 -D key.value.separator.in.input.line=, data out
83 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
HDFS access I
I
ToolRunner → recycle easily the code based on current context Network management/monitoring → repeat tasks and keep all or a recent analytics I I I
by default: no override on files →Avoid user manipulation for preparing the HDFS (rmr) api for HDFS: FilesSystem class
f i n a l FileSystem fs = FileSystem . get ( conf ) ; i f ( fs . exists ( new Path ( args [ 1 ] ) ) ) { fs . delete ( new Path ( args [ 1 ] ) , t r u e ) ; }
84 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Predefined Mappers and Reducers I
Hadoop provides standards Mappers and Reducers I I
I
may have predefined input/output types (use it carefully) examples: TokenCountMapper (split text into keys with a value of one), LongSumReducer (sum all values for one key), identityReducer (directly forward the output of the Mapper)
New API is currently less complete
Old New 85 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Predefined Mappers and Reducers I
How to: I
IPcount with predefined mappers/reducers
86 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Predefined Mappers and Reducers I
How to: I
IPcount with predefined mappers/reducers I
I
I
I
I I
I
I
FieldSelectionMapReduce mapper to split the line using a separator define the separator: job.set("mapred.data.field.separator",",") define the pairs job.set("map.output.key.value.fields.spec","1:1") no useful reducer: LongSumReducer cannot handle Text (defined types outputted by FieldSelectionMapReduce) → define a specific reducer types errors between mapper and reducers ← output of the mapper and reducers are different (default) define types for of the mapper: job.setMapOutputKeyClass(Text.class), job.setMapOutputValueClass(Text.class) difference of the output with different version: different sorting if key output type is Text 86 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Predefined Mappers and Reducers I
Issues I I
Count the number of packets send by each IP address predefined mappers/reducers + combiner
87 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Predefined Mappers and Reducers I
Issues I I
Count the number of packets send by each IP address predefined mappers/reducers + combiner I I I
mapper and reducers similar to previously error type using the reducer as the combiner → define a specific combiner
Map FieldSelectionMapReduce
Reduce
e
on
Cl
Combiner
87 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Custom Parameters of Map Reduce I
Parameters can be retrieved by mappers and reducers I I
I
get the JobConf object public void configure(JobConf job) is called at the initialization and may be customized
Extended IPCount I
apply sampling + compare → #pkts sum for every x flows of each IP address ×x
88 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Custom Parameters of Map Reduce I
Parameters can be retrieved by mappers and reducers I I
I
get the JobConf object public void configure(JobConf job) is called at the initialization and may be customized
Extended IPCount I
apply sampling + compare → #pkts sum for every x flows of each IP address ×x I I I
get the 3rd command line argument and save it in Jobconf get it within the configure method reducer compute the global sum as the sampled one using the modulo operator (%)
88 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Custom Parameters of Map Reduce I
Parameters can be retrieved by mappers and reducers I I
I
get the JobConf object public void configure(JobConf job) is called at the initialization and may be customized
Extended IPCount I
apply sampling + compare → #pkts sum for every x flows of each IP address ×x I I I
get the 3rd command line argument and save it in Jobconf get it within the configure method reducer compute the global sum as the sampled one using the modulo operator (%)
p u b l i c s t a t i c c l a s s Reduce { p u b l i c v o i d configure ( JobConf job ) { sampling = Integer . parseInt ( job . get ( ” s a m p l i n g ” ) ) ; } } p u b l i c i n t run ( String [ ] args ) t h r o w s Exception { job . set ( ” s a m p l i n g ” , args [ 2 ] ) ; }
88 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Outline 1 Introduction 2 Hadoop 3 In practice Environment HDFS MapReduce Programming Customization Streaming Security 4 Complex jobs 5 Applications 6 Tools 7 Installation 89 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
If you don’t like Java... I
I
Hadoop is developed in Java but MapReduce programming may use other languages Streaming I
I
I
use standard I/O: STDIN, STDOUT as input and output of the mapper and reducer every language can be used but has to be installed on all the cluster prior a dedicated jar hadoop-streaming-0.20.2-cdh3u0.jar:
hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-*.jar -help Usage: hadoop hadoop-streaming.jar [options] Options: -input DFS input file(s) for the Map step -output DFS output directory for the Reduce step -mapper The streaming command to run -combiner The streaming command to run -reducer The streaming command to run
90 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Inputs and Outputs
I I
Text records as: key tab value (value may be empty) Generic options are supported → custom separator Mapper: I I
I
input: each line of the input data are outputted to STDIN output: each line of STDOUT
Reducer: I
I
input: output of mappers sorted per keys are outputted per line to STDIN output: each line of STDOUT is written in the output file Mappers k1,v1
k2,v2
{
list(k1,v1) Multiple files Line numbers
To program
Sort Shuffle Aggregate
Reducers k2,v2; k2,v2;...
k3,v3
{
I
To program
list(k3,v3) Multiple files
91 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
IPCount I
IPCount using streaming capabilities + your preferred language
92 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
IPCount I
IPCount using streaming capabilities + your preferred language I I I I
the script is not present on the remote computer → error deploy the script on the cluster: option -file the mapper has to do the key/value split on its own different separators can be used: the output of the mapper is translated for the reducer
hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u0.jar -D stream.map.output.field.separator=, -input data -output out7 -mapper ’IPCountMR.py map’ -reducer ’IPCountMR.py reduce’ -file ’IPCountMR.py’
92 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Python code def map(): for line in sys.stdin: tab = line.split(",") print tab[1],",",1 def reduce(): last_seen = None total = 0 for line in sys.stdin: tab = line.split("\t") if last_seen == None or tab[0] != last_seen: if last_seen !=None: print last_seen,":",total last_seen = tab[0] total = 0 total+= 1 print tab[0],":",total if __name__ == "__main__": if sys.argv[1] == "map": map() else: reduce()
93 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
IPCount I
IPCount using streaming capabilities without scripts
94 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
IPCount I
IPCount using streaming capabilities without scripts I
use the Unix commands I I
map output values to reducer may be empty cut + uniq
hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u0.jar -input data -output out8 -mapper ’cut -d , -f 2’ -reducer ’uniq -c
94 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
IPCount I
IPCount using streaming capabilities without scripts I
use the Unix commands I I
map output values to reducer may be empty cut + uniq
hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u0.jar -input data -output out8 -mapper ’cut -d , -f 2’ -reducer ’uniq -c I
the aggregate package I
the mapper has to specify the function to apply: function:key tab value
94 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
IPCount I
IPCount using streaming capabilities without scripts I
use the Unix commands I I
map output values to reducer may be empty cut + uniq
hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u0.jar -input data -output out8 -mapper ’cut -d , -f 2’ -reducer ’uniq -c I
the aggregate package I
the mapper has to specify the function to apply: function:key tab value
hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u0.jar -input data -output out9 -mapper ’IPCountMapperAggregate.py’ -reducer aggregate -file ’IPCountMapperAggregate.py’
94 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Outline 1 Introduction 2 Hadoop 3 In practice Environment HDFS MapReduce Programming Customization Streaming Security 4 Complex jobs 5 Applications 6 Tools 7 Installation 95 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Rights management I
HDFS files: user / group owner read / write / execute permissions I hadoop dfs command / java api [-chmod [-R] PATH...] [-chown [-R] [OWNER][:[GROUP]] PATH...] [-chgrp [-R] GROUP PATH...] I I
I
quotas: hadoop dfsadmin I
I
limit number of files per directory → avoid too many files (hadoop prefers big files!) [-setQuota ...] [-clrQuota ...] limite space usage [-setSpaceQuota ...] [-clrSpaceQuota ...] 96 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
User managament I
Cluster usage per user: I I
I I
permissions + quotas = limit hdfs usage job scheduling = limit computational resources usage
→ security is enforced → balance cluster usage within trusted and non hostile users (within a team for example) I
user authentication: I
I I I
OS user = Hadoop user based on whoami and bash -c groups superuser = user who started Hadoop daemons impersonate users is very simple (change whoami command)
node to node communication: hadoop customized java RPC I I
no authentication no encryption 97 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Attacker playground I I
A lot of space for attackers + “local” attacks if users can access directly to machines (shared machines) Unique master
Backup node
Sn
Secondary namenode HDFS metadata snapshot
ta Da lts nd su Se t Re Ge User
Su
bm
it
Master node 1 ap
sh
Slave node
Slave node
...
Slave node
ot
namenode block division maintaining
datanode local file system
datanode local file system
datanode
HDFS
local file system
Master node 2
Local I/O
Local I/O
Local I/O
jobtracker
tasktracker
tasktracker
tasktracker
Map Reduce
Map Reduce
Map Reduce
job
Task assignements and monitoring
Application
98 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Security in Hadoop I
Cloud computing and security = a big challenge I I
I
I
I I I
CIA properties outsourced environment → threats from the service providers and other clients many reports / research on that = a large domain → not the objective of the this presentation... but how to make Hadoop a bit more secure
isolate the clusters to a group a trusted users isolate Hadoop machines (put in dedicated VMs) + security appears recently in Hadoop (end of 2011) 99 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Environment HDFS MapReduce Programming Customization Streaming Security
Security in Hadoop I
Kerberos I I
I
simple Authentication and Security Layer (SASL) generic Security Services Application Program Interface (GSSAPI) mutual authentication: user/namenode + delegation token for limiting the load to the KDC (Key Distribution Center) Unique master
Master node 1
Slave node
(1) Authentication
(2) Delegation Token namenode block division maintaining Kerberos User
...
Slave node
(6) Verification
Master node 2
jobtracker Task assignements and monitoring
datanode
k = Shared secret datanode
HDFS
local file local file system system (5 ) (4) r Blo ac eq ck u ce e acc (3) s s es s t au st ok as then en the tica (M use te AC r ) tasktracker
tasktracker
Map Reduce
Map Reduce
Application
100 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A
Outline 1 2 3 4 5 6 7 8
Introduction Hadoop In practice Complex jobs Applications Tools Installation Alternatives 101 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Outline 1 Introduction 2 Hadoop 3 In practice 4 Complex jobs Joining data sources Job chaining 5 Applications 6 Tools 7 Installation 8 Alternatives
102 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
The Problem I
Join two sources of data regarding a shared key Example I I I
example: extract Netflow records of suspect hosts data sources: netflow records, list of suspect IP addresses shared key: the source IP address
Flows.csv 888,3752921443,2463760020,14,1623,1222173626,357,1222173630,810,22,2295,27,6 889,3752919153,2463760020,14,1623,1222173626,359,1222173630,982,22,3054,27,6 890,3752920950,2463760020,14,1631,1222173626,361,1222173630,830,22,4669,27,6 891,3752921425,2463760020,16,1739,1222173626,364,1222173630,662,22,1715,27,6 892,3752921479,2463760020,14,1623,1222173626,364,1222173630,947,22,1894,27,6 893,3752920950,2463760020,16,1747,1222173626,365,1222173630,794,22,4929,27,6 894,2463760020,3752921531,12,1152,1222173627,237,1222173630,824,1048,22,27,6 895,2463760020,3752921533,12,1152,1222173627,288,1222173630,939,1413,22,27,6 896,3752921531,2463760020,14,2317,1222173627,475,1222173630,824,22,1048,27,6 897,3752921533,2463760020,13,2259,1222173627,521,1222173630,939,22,1413,27,6 898,2463760020,3752980554,13,1204,1222173627,965,1222173630,399,2399,22,27,6 899,2463760020,3752980558,14,1256,1222173628,54,1222173630,418,1966,22,27,6
}
Join
}
I
Suspects.txt 3752920950 3752921533
890,3752920950,2463760020,14,1631,1222173626,361,1222173630,830,22,4669,27,6 893,3752920950,2463760020,16,1747,1222173626,365,1222173630,794,22,4929,27,6 897,3752921533,2463760020,13,2259,1222173627,521,1222173630,939,22,1413,27,6
103 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
DataJoin package I
A specific package: DataJoin I
requirement: add the package to the classpath (hadoop-env.sh) export HADOOP_CLASSPATH="/usr/lib/hadoop/contrib/datajoin /hadoop-datajoin-0.20.2-cdh3u0.jar:$HADOOP_CLASSPATH"
I
I
output of the map: tagged data (depending on the data source) + group keys input of a reducer: all tagged outputs for a single key Mappers
Flows.csv 3752919153,2463760020 3752920950,2463760020 3752921479,2463760020 3752920950,2463760020 3752921533,2463760020
flows, 3752919153, 2463760020 flows, 3752920950, 2463760020 flows, 3752921479, 2463760020 flows, 3752920950, 2463760020 flows, 3752921533, 2463760020
Suspects.txt 3752920950 3752921533
addr, 3752920950 addr, 3752921533
Tag
(Group) Key
Value
{
Reducers
3752919153 flows, 3752919153, 2463760020 3752920950 flows, 3752920950, 2463760020 flows, 3752920950, 2463760020 addr, 3752920950
{
3752921479 flows, 3752921479, 2463760020 3752921533 flows, 3752921533, 2463760020 addr, 3752921533
104 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
DataJoin I
A specific package: datajoin I
reducer: I I
cross-product on each group key with same tag at once combine() function: final output applied on each combination of reduce() → example: copy the record if source IP address is in the suspect host list
3752919153 flows, 3752919153, 2463760020 3752920950 flows, 3752920950, 2463760021 flows, 3752920950, 2463760023 addr, 3752920950
reduce
combine
flows, 3752919153, 2463760020 flows, 3752920950, 2463760021 addr, 3752920950
3752920950, 2463760021
flows, 3752920950, 2463760023 addr, 3752920950
3752920950, 2463760023
105 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Implementation I
Implementation I I
I
public
taggedMapOutput = map output data (key+value) + tag defines the type the data (depends on the conf) + getter + setter/constructor: Text serialization: write(), readFields() + default constructor without argument (for serialization): Hadoop Writable types of tag and data (Text) can be used
static
c l a s s MyTag gedOutp ut e x t e n d s T ag ge dM a pO ut pu t {
p r i v a t e Type data ; p u b l i c MyTa ggedOutp ut ( ) p u b l i c MyTa ggedOutp ut ( Writable val ) p u b l i c Writable getData ( ) p u b l i c v o i d readFields ( DataInput arg0 ) t h r o w s IOException p u b l i c v o i d write ( DataOutput arg0 ) t h r o w s IOException }
106 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Implementation I
Implementation I
taggedMapOutput = map output data (key+value) + tag
p u b l i c s t a t i c c l a s s MyTag gedOutp ut e x t e n d s T ag ge d Ma pO ut p ut { p r i v a t e Text value ; p u b l i c MyTa ggedOutp ut ( ) { value = new Text ( ” ” ) ; } p u b l i c MyTa ggedOutp ut ( Text data , Text tag ) { value = data ; setTag ( tag ) ; } p u b l i c Writable getData ( ) { r e t u r n value ; } p u b l i c v o i d write ( DataOutput out ) t h r o w s IOException { tag . write ( out ) ; value . write ( out ) ; } p u b l i c v o i d readFields ( DataInput in ) t h r o w s IOException { tag . readFields ( in ) ; value . readFields ( in ) ; } }
107 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Implementation I
Mapper implementation 1. generateInputTag() determines the tag(Text) from the file 2. map(): I
I
I
generateTaggedMapOutput() wraps the map output (key,value) in tagged output generateGroupKey() generates the group key from a tagged output
reminder: Strings cannot be compared with ==, Texts cannot be compared with equals()
public
static
c l a s s Map
e x t e n d s DataJoinMapperBase {
p r o t e c t e d Text g e n e r a t e I n p u t T a g ( String inFile ) p r o t e c t e d T ag ge dM a pO ut pu t g e n e r a t e T a g g e d M a p O u t p u t ( Object arg0 ) p r o t e c t e d Text g e n e r a t e G r o u p K e y ( T ag ge dM a pO ut pu t arg0 ) }
108 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Implementation I
Mapper implementation
public
static
c l a s s Map
e x t e n d s DataJoinMapperBase {
p r o t e c t e d Text g e n e r a t e I n p u t T a g ( String inFile ) { i f ( inFile . contains ( ” d a t a ” ) ) r e t u r n new Text ( ” T r a f f i c ” ) ; } e l s e { r e t u r n new Text ( ” S u s p e c t ” ) ; } } p r o t e c t e d T ag ge dM a pO ut pu t g e n e r a t e T a g g e d M a p O u t p u t ( Object arg0 ) ←{ T ag ge d Ma pO ut p ut retv = new MyTag gedOutp ut ( ( Text ) arg0 , ←inputTag ) ; r e t u r n retv ; } p r o t e c t e d Text g e n e r a t e G r o u p K e y ( T ag ge dM a pO ut pu t arg0 ) { i f ( t h i s . inputTag . toString ( ) . equals ( ” T r a f f i c ” ) ) r e t u r n ←new Text ( ( ( Text ) arg0 . getData ( ) ) . toString ( ) . split ( ” , ” ) ←[1]) ; e l s e r e t u r n ( Text ) arg0 . getData ( ) ; } }
109 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Implementation I
Reducer implementation: very easy! → only combine() I I
I I I
public
applied to each group key from a list of tags and values: tags[i] is associated to values[i] tagged output: tag is not important final output: group key, value of tagged output examples: checks the number of tags and output the value corresponding to the record
static
c l a s s Reduce e x t e n d s D a t a J o i n R e d u c e r B a s e
{
p r o t e c t e d T ag ge dM a pO ut pu t combine ( Object [ ] tags , Object [ ] ←values ) { } }
110 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Implementation I
Reducer implementation: very easy! → only combine() I I
I I I
public
applied to each group key from a list of tags and values: tags[i] is associated to values[i] tagged output: tag is not important final output: group key, value of tagged output examples: checks the number of tags and output the value corresponding to the record
static
c l a s s Reduce e x t e n d s D a t a J o i n R e d u c e r B a s e
{
p r o t e c t e d T ag ge dM a pO ut pu t combine ( Object [ ] tags , Object [ ] ←values ) { } }
110 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Implementation Reducer implementation: very easy! → only combine()
I
public
static
c l a s s Reduce e x t e n d s D a t a J o i n R e d u c e r B a s e
{
p r o t e c t e d T ag ge dM a pO ut pu t combine ( Object [ ] tags , Object [ ] ←values ) { i f ( tags . length ==2) { i f ( tags [ 0 ] . toString ( ) . equals ( ” S u s p e c t ” ) ) r e t u r n new ←MyTa ggedOutp ut ( ( Text ) ( ( MyTag gedOutp ut ) values [ 1 ] ) ←. getData ( ) , ( Text ) tags [ 0 ] ) ; else r e t u r n new MyTa ggedOutp ut ( ( Text ) ( ( ←MyTa ggedOutp ut ) values [ 0 ] ) . getData ( ) , ( Text ) tags←[0]) ; } return null ; } }
111 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Implementation I
main/run I
standard definition
112 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Implementation I
main/run I I
standard definition defines the right types: tagged output
F il eI np u tF or ma t . setInputPaths ( job , in ) ; F il eI np u tF or ma t . addInputPath ( job , in2 ) ; F i l e O u t p u t Fo r m at . setOutputPath ( job , out ) ; job . setIn putForm at ( T ex tI np u tF or ma t . c l a s s ) ; job . s et Ou tpu tF or ma t ( T e x t O u t p u t F o r m a t . c l a s s ) ; job . setMa pperCla ss ( Map . c l a s s ) ; job . s et Re duc er Cl as s ( Reduce . c l a s s ) ; job . s e t M a p O u t p u t K e y C l a s s ( Text . c l a s s ) ; job . s e t M a p O u t p u t V a l u e C l a s s ( MyTa ggedOutp ut . c l a s s ) ; job . s e t O u t p u t K e y C l a s s ( Text . c l a s s ) ; job . s e t O u t p u t V a l u e C l a s s ( MyTag gedOutp ut . c l a s s ) ;
112 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Poor performances I
I
nice, flexible and abstracted way to do a join operation but... ...poor performances I I
not good excuses: virtual machine, no cluster... join is only done at the reduce stage I
I
I
output of the mapper is probably highly bigger than really needed output of the mapper has to be sorted and shuffled in the network the join will create much more combinations than necessary
113 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Poor performances I
Solutions: I
I
Join job may be done by the mappers → extra operations for guaranteeing the accessibility of the data A better design: usual join between a big file and multiple small others → the small files can be put on every machines for the mapper (no huge overhead) Mappers
Flows.csv 3752919153,2463760020 3752920950,2463760020 3752921479,2463760020 3752920950,2463760020 3752921533,2463760020
flows, 3752919153, 2463760020 flows, 3752920950, 2463760020 flows, 3752921479, 2463760020 flows, 3752920950, 2463760020 flows, 3752921533, 2463760020
Suspects.txt 3752920950 3752921533
addr, 3752920950 addr, 3752921533
Tag
(Group) Key
Value
114 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Outline 1 Introduction 2 Hadoop 3 In practice 4 Complex jobs Joining data sources Job chaining 5 Applications 6 Tools 7 Installation 8 Alternatives
115 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Simple chaining I
I I
Sequential run of Map Reduce application (easiest design) Worklow: Map1 - Reduce1 - Map2 - Reduce2... One global application: I
I
I
JobClient.runJob(job) waits the end of job before returning → sequential call to JobClient.runJob(job)
Network monitoring use case I I
I
#pkts count per IP address user defines a file with IP addresses related to potential risks: spam, botnet, DoS the #pkts are summed per risk 116 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Simple chaining I I
I
First Map Reduce as previous Out file of the 1st Map Reduce = In file of the second Map Reduce = temporary file → delete at the end using FileSystem Second Map Reduce I
I
I
get the suspect IP addresses via an XML configuration file: job.addResource(new Path(args[2])) define configure(JobConf) in mapper for retrieving the properties one reducer per risk
117 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Simple chaining
public static class Reduce {
Config file
IP_botnet 3709183202,3709183222,3709183228 adresses of botnet ...
public void reduce() throws IOException { //sum of pkts per IP address } public static class MapperAttacker { public void configure(JobConf job) { //Read the config file with suspect IP addresses }
public void map() throws IOException { for each IP address, if it s a suspect one, add packets to the corresponding attack } } public int run(String[] args) throws Exception {
Job 1
JobConf job = new JobConf(conf, IPCountPktsAttacksChain.class); job.setMapperClass(FieldSelectionMapReduce.class); job.setReducerClass(Reduce.class); FileOutputFormat.setOutputPath(job, new Path(myTempPath)); JobClient.runJob(job);
Additional properties for the job configuration
Chaining
JobConf job2 = new JobConf(conf, IPCountPktsAttacksChain.class); job2.addResource(new Path(args[2])); FileInputFormat.setInputPaths(job, new Path(myTempPath));
Job 2 Predefined Mapper and Reducer
job2.setMapperClass(MapperAttacker.class); job2.setReducerClass(LongSumReducer.class); JobClient.runJob(job2); fs.delete(new Path(myTempPath),true); }
HDFS cleaning
118 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Dependent jobs I
I
I
I
I
I
Multiple jobs with dependency → JobControl no direct execution as Map Reduce 1 JobClient.runJob(JobConf) needs to create Job from Map Reduce 1 JobConf: new Job(JobConf) Complex chaining add jobs: JobControl.addJob(Job) add dependency: job2.addDependingJob(job1)
Simple chaining Map Reduce 2
Map Reduce 3
Map Reduce 2
Map Reduce 6
Map Reduce 3 Map Reduce 5 Map Reduce 4
Parralel runs
JobControl orders the jobs, implements Runnable (thread based design) and can be monitored (running/failed jobs...) 119 / 242
Introduction Hadoop In practice Complex jobs Applications Tools Installation A Joining data sources Job chaining
Dependent jobs I
Example I
I
I
I
I
count the number of packets per suspect address → one out file per category (spam, DoS, botnets) define a function for setting up the jobs (most of them share the same parameters) monitor the global execution by displaying regularly the names of running jobs single machine: hard to see parallel jobs (increase the number of reduce tasks)
Java (recalls): I
generic type as function parameter: Class