Map-Reduce and Hadoop-1.5cm

Technologies to handle large volumes of data. ▷ → hard-drives ... curation, storage, search, sharing, transfer, analysis, and visualization ... Data analytics. ▻.
10MB taille 4 téléchargements 280 vues
Map-Reduce and Hadoop J´erˆome Fran¸cois

[email protected]

Mai 2015

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Outline 1 2 3 4 5 6 7

Introduction Hadoop Installation In practice Design Patterns Applications Tools 2

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Outline 1 2 3 4 5 6 7

Introduction Hadoop Installation In practice Design Patterns Applications Tools 3

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Big Data I I

I I

Technologies to handle large volumes of data → hard-drives and fast controllers, databases, distributed filesystems... + doing some computation over data → parallel processing, grid, cloud, supercomputer...

Similar but Big Data focuses on data: I computation on a small piece of data is usually trivial I but this needs to be done for many pieces I horizontal scaling 4

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Definition I I

There is no single standard definition wikipedia definition

Big data is the term for a collection of data sets so large and complex that it becomes difficult to process using on-hand database management tools or traditional data processing applications. The challenges include capture, curation, storage, search, sharing, transfer, analysis, and visualization I

big data problems ∼ scalability problems 5

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

The Vs of Big Data I

Emergence of a list of characteristics of Big Data VOLUME

1 exabyte (1018) each day 40 zettabytes (1021) of created data in 2020

Content Provider

Social Networking

VARIETY

Twitter = 500 millions users Facebook = 300 petabytes (1015)

6 billions hours video watched per month + 600 websites / minute

Network traffic 5 billion of mobile phone users Internet traffic 43 569 PetaBytes

VELOCITY

Health care 150 exabytes (2011) 420 million of wearable sensors (2014)

E-shopping, Banking, military, meteorology, ...

100 hours of video are uploaded to YouTube every minute Youtube Content ID scans over 400 years of video every day Wallmart: 1 million customer transactions every hour 175 million tweets every day New York stock exchange: 1 TB of trade information Akami: 20 millions of HTTP connection / second... Amazon: up to 600 trasnactions / second

7,1 billion of Internet Users, 100 sensors in each modern car, ...

Sources: IBM, youtube, facebook, International Telecommunications Union, http://wikibon.org/blog/big-data-statistics/, sas, akamei

I

other Vs exist (we will see later) 6

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Targeted problems I

Generic Input I I

I

Generic Output I

I

very large data volumes unstructured or less structured data or flexible structure with unclear correlation general (aggregated) semantic of the input (important facts, trends, etc.)

Method / algorithm I I

Data analytics should be guided by all the data (should not need too much complex approach and knowledge... but the reality is different) 7

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Targeted problems 2 I

Examples I LinkedIn (75 To of data): daily and customized update of the homepage of each user I YouTube video scanning: Law enforcement + money for partners (protected contents) I Sales and market trends prediction, weather forecast, crime prediction, electric grid monitoring... I and IT security, Security Information and Event Management or SIEM (IBM Qradar)

I

data semantic is evaluated globally → information can be approximated

I

big data 6= applications dealing with exact information

I

but underlying data can be the same I all individual stock transactions need to be stored without error I market prediction can still work if few percents of stock transactions is not known (Veracity is another V ) 8

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Technical challenges I

Technical requirements I

I

I

Software requirements I

I

I

storage and computation devices (hard drive, cpu...) + network → datacenter design distributed storage and computation (databases, frameworks, libraries...) rethinking algorithms to be compatibles with these software/approaches (for example, how to multiply two matrices using Map-Reduce)

Online example: Akamai monitoring, http://www. akamai.com/html/technology/dataviz3.html 9

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Big data requirements I

Big data applications needs data I I I

I

I

collect and write read and analysis rarely delete

Usually: data written once (+ replicas) but read multiple times (thousands, millions or more) → distributed approaches to store data I I

as files: distributed filesystems or with more structuring: distributed databases

10

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Outline 1 2 3 4 5 6 7

Introduction Hadoop Installation In practice Design Patterns Applications Tools

11

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Materials I

My inspiration: I

official websites: I I I I

I

http://hadoop.apache.org/ http://www.cloudera.com/ http://pig.apache.org/ http://hbase.apache.org/

books: I I I

Hadoop in Action, Chuck Lam, Manning Publications Pro Hadoop, Jason Venner, Apress Hadoop: The Definitive Guide, Tom White, O’Reilly Yahoo Press

12

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

1 Introduction

Outline

2 Hadoop

3 4 5 6 7

Overview MapReduce HDFS Components Map Reduce v2.0 Installation In practice Design Patterns Applications Tools

13

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

What it is?

I I I I I

Open-source framework Distributed applications for processing large data files Easy to use (and deploy) → no need to be an expert Robust: assumes that there are malfunctionning Popular (http://wiki.apache.org/hadoop/PoweredBy): I I I

non expert / small companies universities large companies: FaceBook, Google, Ebay, Twitter...

14

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Data-intensive processing I

Large data files handling = data-intensive processing I I

I

6= computational intensive work small process on many pieces of information contained in data files another way to distribute the work: I I I I

large data are usually already stored on clusters the code is transferred where the data is the code is executed where the data is no data transfer → avoid network overhead

15

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Data-intensive processing I

Large data files handling = data-intensive processing I I

I

6= computational intensive work small process on many pieces of information contained in data files another way to distribute the work: I I I I

large data are usually already stored on clusters the code is transferred where the data is the code is executed where the data is no data transfer → avoid network overhead

15

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Data-intensive processing I

Large data files handling = data-intensive processing I I

I

6= computational intensive work small process on many pieces of information contained in data files another way to distribute the work: I I I I

large data are usually already stored on clusters the code is transferred where the data is the code is executed where the data is no data transfer → avoid network overhead

15

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Data-intensive processing Network data volumes increase I

Cisco measured and forecasted Internet traffic (1000 PB/day) 60000 50000

PB/month

I

40000 30000 20000 10000 0

14

12

20

10

20

08

20

06

20

04

20

02

20

00

20

98

20

96

19

94

19

19

Year I

0.7 TB/day in Luxembourg

16

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Scale-Up vs Scale-Out I

Scale-up I I

I

Scale-out I I

I

add more resources on a single machines very expansive: high performance for I/O → SSD, SAS, SATA 3.0 (hard drives + controllers) add more machines with less resources less expansive

example: read 10TB Channel througput Scaling Time

High-end server 200MB/s x2 channels 7:00

Common machine 50 MB/s x20 machines 2:45

17

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

History I I

I

I

I

2004: Google File System and Map-Reduce Implementation of MapReduce for Nutch (Apache project): open-source search engine → Hadoop is created by Doug Cutting 2006: Doug Cutting hired by Yahoo! for working on Hadoop 2008: 10,000+core cluster in production for Yahoo! search engine 2009: Amazon Elastic MapReduce is released and uses Hadoop

18

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

1 Introduction

Outline

2 Hadoop

3 4 5 6 7

Overview MapReduce HDFS Components Map Reduce v2.0 Installation In practice Design Patterns Applications Tools

19

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Approach I

Data processing model with two key components: I I

I

the mappers execute the programs on data subsets the reducers aggregate the results from the mappers

toy example: wordcount, IPAddressCount I

count the number of flows per source IP address

IP Src 1.1.1.1 3.3.3.3 3.3.3.3 4.4.4.4 2.2.2.2 1.1.1.1

IP Dst 2.2.2.2 2.2.2.2 4.4.4.4 5.5.5.5 4.4.4.4 2.2.2.2

IP Src 1.1.1.1 2.2.2.2 3.3.3.3 4.4.4.4

Count 2 1 2 1

20

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Src IP address count I

One loop over the lines + a hash table for each li in lines: src_IP = get_src_IP(li) IP_count[src_IP]++ end for

I

//Tokenization

multiple files: 1 loop over files + 1 loop over the lines + a hash table for each f in files: for each li in lines(f): src_IP = get_src_IP(li) IP_count[src_IP]++ end for end for

//Tokenization

21

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Map-Reduce and Src IP address count I

Multiple input files/file subsets Mappers

IP Src File 1 1.1.1.1 3.3.3.3 File 2 3.3.3.3 4.4.4.4 2.2.2.2 File 3 1.1.1.1 I

IP Dst 2.2.2.2 2.2.2.2 4.4.4.4 5.5.5.5 4.4.4.4 2.2.2.2

1.1.1.1,1 3.3.3.3,2 4.4.4.4,1 2.2.2.2,1 1.1.1.1,1

Reducer

IP Src 1.1.1.1 2.2.2.2 3.3.3.3 4.4.4.4

Count 2 1 2 1

Issues: I

I I

file transfer overhead → the mapper program is transferred where the data is 1 reduce = 1 single point of failure each mapper has to memorize the hashtable before sending it

22

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Map-Reduce and Src IP address count I

Pseudo-code of Mappers: for each li in lines(f): src_IP = get_src_IP(li) //Tokenization IP_count[src_IP]++ //IP_count may be big in memory end for send(IP_count) //Send to the reducer

I

Pseudo-code of the reducer: for each received_IP_count: for each key in received_IP_count IP_count[key] = IP_count[key] + received_IP_count end for end for

23

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Mapper optimization I

Don’t wait the end of the process for sending the results I

I

for each parsed IP address, send the information with a count = 1 the reducer is already affected to the aggregation task Mappers

IP Src File 1 1.1.1.1 3.3.3.3 File 2 3.3.3.3 4.4.4.4 2.2.2.2 File 3 1.1.1.1

IP Dst 2.2.2.2 2.2.2.2 4.4.4.4 5.5.5.5 4.4.4.4 2.2.2.2

1.1.1.1,1 3.3.3.3,1 3.3.3.3,1 4.4.4.4,1 2.2.2.2,1 1.1.1.1,1

Reducer

IP Src 1.1.1.1 2.2.2.2 3.3.3.3 4.4.4.4

Count 2 1 2 1

24

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Map-Reduce and Src IP address count I

Pseudo-code of Mappers: for each li in lines(f): src_IP = get_src_IP(li) send([src_IP,1]) end for

I

//Tokenization

Pseudo-code of the reducer = the same: for each received_IP_count: for each key in received_IP_count IP_count[key] = IP_count[key] + received_IP_count end for end for

25

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Multiple reducers I

Process the reducing task in parallel I

partition the output of the Mappers 1. 1.1.1.1, 2.2.2.2 2. 3.3.3.3, 4.4.4.4

I

I

assuming an IP address, all counts have to be sent to the same reducer (shuffle) multiple outputs with disjoint results Mappers

IP Src File 1 1.1.1.1 3.3.3.3 File 2 3.3.3.3 4.4.4.4 2.2.2.2 File 3 1.1.1.1

IP Dst 2.2.2.2 2.2.2.2 4.4.4.4 5.5.5.5 4.4.4.4 2.2.2.2

Reducers IP Src Count 1.1.1.1 2 2.2.2.2 1

1.1.1.1,1 3.3.3.3,1 3.3.3.3,1 4.4.4.4,1 2.2.2.2,1 1.1.1.1,1

IP Src Count 3.3.3.3 2 4.4.4.4 1 Shuffle

26

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Multiple reducers I

Pseudo-code of Mappers (reducer of function is the result of partitioning): for each li in lines(f): src_IP = get_src_IP(li,reducer_of(li)) send([src_IP,1]) end for

I

//Tokenization

Pseudo-code of the reducer = the same: for each received_IP_count: for each key in received_IP_count IP_count[key] = IP_count[key] + received_IP_count end for end for

27

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Formal Map-Reduce I

Map-Reduce I I I

a set of mappers: Mappers a set of reducers: Reducers approach based on hkey , valuei pairs: I

I I I

map input: hk1, v 1i (k1: line number, filename... but rarely used for further usage) intermediate between mappers and reducers: hk2, v 2i reduce output: hk3, v 3i

partitioner: k2 → Reducers

list() Line numbers (,... )



list()





,... )

Shuffle

Mappers



Reducers list()



(, ...)

28

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Formal Map-Reduce I

∀k2, the values v 2 are aggregated to a single list I I

network overhead reduction hk2, v 2i are sorted by keys (then easy to aggregate): I I

I

partial sort on the mapper machine final sort on the reducer machine

∼ hash table on mapper (one prior issue) I I

I

managed by the (Hadoop) framework optimizations already build-in (buffers, mix between hard drive and in-memory storage) final user/developer has not to take care about it

Mappers list() list()

(, ) ()

Sort and Shuffle

k2,v20 k2,v21 ... k2',v2'0 k2',v2'1

} }

(, (, )

Aggregate

Reducers list()



(, ...)

29

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

The Black-Box I

I

Hadoop = MapReduce framework with many abstractions Easy to use for novices and enough advanced for experts I

I

programming is limited to the core of the mappers and reducers = specific process regarding the application the “Black-Box” may be fine-tuned by configurations and programming Mappers k2,v2

Multiple files Line numbers

To program

Sort Shuffle Aggregate

Reducers

k3,v3

{

k1,v1

{

list(k1,v1)

To program

list(k3,v3) Multiple files

30

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

1 Introduction

Outline

2 Hadoop

3 4 5 6 7

Overview MapReduce HDFS Components Map Reduce v2.0 Installation In practice Design Patterns Applications Tools

31

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

The Need of a Specific File System I I

HDFS: Hadoop Distributed File System Why a specific file system I I

I

paradigm shift: send the code to the data → data has to be previously distributed

A file in HDFS is broken in multiple blocks I I

the mapper processes the blocks they have blocks are maintained to keep redundancy (3 copies) Mappers

1 file

{

Block 1 Block 2 Block 3

IP Src File 1 1.1.1.1 3.3.3.3 File 2 3.3.3.3 4.4.4.4 2.2.2.2 File 3 1.1.1.1

IP Dst 2.2.2.2 Native FS 2.2.2.2 Native FS 4.4.4.4 5.5.5.5 4.4.4.4 Native F S 2.2.2.2

32

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Files, Blocks and Splits I

Blocks and splits I I I

I

a block is a physical division (64 MB) a split is a logical division they are not necessarily aligned → a line of a file may be split over two blocks

Using multiple files is still possible I I

each file will be split by blocks Hadoop is more efficient with bigger files I I

manage easily 100 TB or more in a single file abstract the handling of large files to the user

33

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

1 Introduction

Outline

2 Hadoop

3 4 5 6 7

Overview MapReduce HDFS Components Map Reduce v2.0 Installation In practice Design Patterns Applications Tools

34

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

I

I I

I

HDFS

NameNode = metadata of the HDFS

manage replication no transfer through this node → avoid overhead

Datanodes on slave machines: I I

file blocks I/O continuously reporting to Namenode

(3) Block I/O request

Master node 1 (1) Send Data Get Results

User

(2) Blocks/Datanode to request

Slave node

datanode

namenode

Slave node

datanode

local file system

block division maintaining

(2') Replication request

...

Slave node

datanode

local file system

HDFS

local file system

(3') Data transfert (4') Report

(1') Report

//

35

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Map-Reduce I

Jobtracker: I I

I

assigns the tasks monitors the tasks and reassignment if failure

Tasktracker: I I I

executes locally the tasks reports to the jobtracker outputs the map results to the reducers Slave node

Master node 2

User

Submit job

jobtracker Task assignements and monitoring

Assign tasks

Slave node

...

Slave node

tasktracker

tasktracker

tasktracker

Map Reduce

Map Reduce

Map Reduce

Application

Map output sent to hash(key)

Hearthbeat

36

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Global architecture I

Namenode: a single point of failure secondary Namenode for taking metadata snapshot manual intervention to recover

I I

Unique master Backup node

Sn

Secondary namenode HDFS metadata snapshot

ta Da lts nd su Se t Re e G User

Su

bm

it

Master node 1 ap

sh

Slave node

Slave node

...

Slave node

ot

namenode block division maintaining

datanode local file system

datanode local file system

datanode

HDFS

local file system

Master node 2

Local I/O

Local I/O

Local I/O

jobtracker

tasktracker

tasktracker

tasktracker

Map Reduce

Map Reduce

Map Reduce

job

Task assignements and monitoring

Application

37

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

1 Introduction

Outline

2 Hadoop

3 4 5 6 7

Overview MapReduce HDFS Components Map Reduce v2.0 Installation In practice Design Patterns Applications Tools

38

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Context I

I

I I

Map Reduce V1.0 is a strict design pattern where an application has mappers and reducers Many people does not follow it: only mappers tasks, wants to use HDFS for its reliability, interactive needs... jobtracker has a limited scalability → Map Reduce v2 = YARN (Yet Another Ressource Negotiator I

I I I I

decoupling the jobtracker into the ressource management and job scheduling/monitoring main authority = Ressource Manager per application Appliaction Master per Node Node Manager per application Container

39

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

v1 vs. v2

I

I

ApplicationMatsre negotiates ressources and follow the execution of Containers still compatible with MRv1

40

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview MapReduce HDFS Components Map Reduce v2.0

Yarn example I

src: http: //hortonworks.com/blog/ apache-hadoop-yarn-concepts-and-applications 1. 2. 3. 4. 5. 6.

job submission ApplicationMaster creation ApplicationMaster registration negotiation launch of container application execution in containers + report to ApplicationMaster 7. Reporting from ApplicationMaster to Client 8. Unregistration

41

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Outline 1 2 3 4 5 6 7

Introduction Hadoop Installation In practice Design Patterns Applications Tools

42

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Setting up a cluster 1. Buy and install hardware 2. Install OS 3. Enable communications between master and other nodes 4. Install Hadoop 5. Run Hadoop 6. Enjoy!

43

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Setting up a cluster I I

Step 2: Install OS (Unix/Linux) Step 3: enable communication I

Hadoop relies on the user management of the underlying system to start/stop daemons remotely I I I

I

a user account for hadoop (same name for all machines) remote SSH access user rights management (log files!) = a common source of many warnings all other communications are done through RPC

44

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Setting up a cluster I

Step 4: install Hadoop I I I

download and uncompress an archive install java configure I I I

I I I

highly tunable cluster built: nodes have to know each other initialization of the different services (namenode, jobtracker, tasktracker, datanode) namenode / datanode customization job / tasktracker customization remotely update a configuration in the entire cluster: rsync, dsh

45

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Configuration I

(XML) configuration files I

I

in ${HADOOP CONF DIR} (/etc/hadoop/conf), symlink / alternative framework cluster config I I

I

I

I I

I

masters and slaves files core-site.xml: general properties (log file max size, tmp directory) hdfs-site.xml: local directories for data or metadata, replication level... mapred-site.xml: jobtracker location (host/port), directories for intermediate files MR v2 → yarn-site.xml default files (*-default.xml): in the .jar file, should not to be modified, properties have to be overridden

environment config: hadoop-env.sh I I

JAVA HOME → path to Java add extra classpaths

46

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Machine Assignation I I I

secondary namenode: masters file slaves (datanode + tasttracker) = slaves file jobtracker and namenode automatically determined when started (where the daemon is executed) + for other nodes: I jobtracker specified as mapred.job.tracker in mapred-site.xml mapred.job.tracker hadoop-host1:8021 I

namenode specified as fs.default.name in core-site.xml fs.default.name hdfs://hadoop-host2:8020

47

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Configuration Samples I

mapred-site.xml mapred.job.tracker.http.address 0.0.0.0:50030 The job tracker http server address and port the server will listen on. If the port is 0 then the server will start on a free port.

I

Hadoop can be fine-tuned I I I

but packaged configuration are quite usable tuning at the end of the tutorial (cluster setting up) documentation I I I

Cloudera User guide Hadoop official documentation comments in default files

48

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

HDFS configuration Property fs.default.name dfs.name.dir dfs.data.dir dfs.replication mapred.jobtracker mapred.local.dir mapred.tasktracker.map.tasks. maximum mapred.tasktracker.reduce.tasks. maximum dfs.http.address dfs.datanode.http.address

Goal namenode location local directories for metadata local directories for data number of replication jobtracker location intermediate results of MapReduce job maximum number of map tasks per tasktracker maximum number of reduce tasks per tasktracker location of the namenode web interface location of the datanode web interface

49

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Read the doc! I I

I

I I

Hadoop is fastly evolving = new/removed features Hadoop is big = impossible to list and remeber all of them but the doc is exhaustive: tunable properties, examples, api... link: http://hadoop.apache.org/docs/r(version here)/ e.g. http://hadoop.apache.org/docs/r2.5.0/

50

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Outline 1 2 3 4 5 6 7

Introduction Hadoop Installation In practice Design Patterns Applications Tools

51

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

1 Introduction

Outline

2 Hadoop 3 Installation 4 In practice

Environment HDFS MapReduce Programming Customization Streaming 5 Design Patterns 6 Applications 7 Tools

52

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Installation I

Different installation mode: I

I

I

I

fully distributed: master machine + backup machine + slave machines (for production) standalone: single machine, no Hadoop Daemon (for development/debugging purposes) pseudo distributed: a single machine with the different daemon (for development/debugging purposes, memory usage, interaction)

Cloudera I I I

http://www.cloudera.com/ Hadoop-based services Cloudera’s Distribution including Apache Hadoop (CDH): Hadoop + plug-ins + packaging → easy-to-[install|use|deploy]

53

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

1 Introduction

Outline

2 Hadoop 3 Installation 4 In practice

Environment HDFS MapReduce Programming Customization Streaming 5 Design Patterns 6 Applications 7 Tools

54

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Program Workflow Write data

Analysis might be intensive

I

I I

I

Analysis

...

Read results Read results

Write data ∼ upload data I

I

Read results

Analysis

can be network intensive multiple analysis for the same data incremental updates

read / write = I/O on HDFS are essential HDFS may not be accessed using standard UNIX commands → hadoop fs -cmd

55

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Example I

core-site.xml: fs.default.name hdfs://localhost:8020

I

ls Example I I

hdfs dfs -ls hdfs dfs -ls hdfs://localhost:8020/

56

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Example I

core-site.xml: fs.default.name hdfs://localhost:8020

I

ls Example I I

hdfs dfs -ls hdfs dfs -ls hdfs://localhost:8020/ No replication for directory (= structure) !

Replication level = 1

bydefault: user directory

56

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Prepare data I

hadoop fs, hadoop fs -help ls → Goal display sample.txt get a local copy of sample.txt remove sample.txt create a directory for data copy flows100000.csv into the data directory

I

Cmd

sample file extracted from the dataset provided in Sperotto, Anna and Sadre, Ramin and Vliet van, Frank and Pras, Aiko (2009) A Labeled Data Set For Flow-based Intrusion Detection. In: IP Operations and Management. Lecture Notes in Computer Science 5843, pp. 39-50. ISBN 9783642049675

I

each line is a record: id, src ip, dst ip, #packets, #bytes, start time, start msec, end time, end msec, src port, dst port, tcp flags, proto

57

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Prepare data I

hadoop fs, hadoop fs -help ls → Goal display sample.txt get a local copy of sample.txt remove sample.txt create a directory for data copy flows100000.csv into the data directory

I

Cmd hdfs dfs -cat sample.txt -get sample.txt localSample.txt -rm sample.txt -mkdir data -put flows1000000.csv data

sample file extracted from the dataset provided in Sperotto, Anna and Sadre, Ramin and Vliet van, Frank and Pras, Aiko (2009) A Labeled Data Set For Flow-based Intrusion Detection. In: IP Operations and Management. Lecture Notes in Computer Science 5843, pp. 39-50. ISBN 9783642049675

I

each line is a record: id, src ip, dst ip, #packets, #bytes, start time, start msec, end time, end msec, src port, dst port, tcp flags, proto

57

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Monitoring I I

I

I I

Web GUI: namenode-location:50070 single-node install: localhost:50070

status/health of HDFS free space HDFS browsing

58

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Monitoring I I

Web GUI: namenode-location:50070 single-node install: localhost:50070 data size on DFS

I

I I

status/health of HDFS free space HDFS browsing

free physical space

failures safe removing

58

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

1 Introduction

Outline

2 Hadoop 3 Installation 4 In practice

Environment HDFS MapReduce Programming Customization Streaming 5 Design Patterns 6 Applications 7 Tools

59

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Types I

Key and values have specific types (serialization requirements): I I I I

I

package org.apache.hadoop.io value types implement Writable key type WritableComparable default type (WritableComparable) are wrappers for standard types: [Boolean|Byte|Double|Float|Int|Long]Writable, Text, NullWritable defining custom types is possible I I

Writable → readFields, write WritableComparable → + compareTo

60

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Types I

The MapReduce program has to read and Write on the HDFS → specific format I

input → InputFormat

Class TextInputFormat

Record each line

NLineInputFormat

similar to NLineInline offset (Text) putFormat where each split is exaclty N lines defined by mapred.line. input.format.linespermap (LongWritable) each line before the sepafter the separator arator defined by (Text) key.value.separator.in. input.line (default \t) (Text)

KeyValueTextInputFormat

Key line offset

Value line content (LongWritable) (Text) line content

61

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Types I

The MapReduce program has to read and Write on the HDFS → specific format I

output → OutputFormat

Class TextOutputFormat SequenceFileInputormat NullOutputFormat

Description 1 record (Key tab value) per line (separator is defined by textoutputformat.separator ) Specific format for passing binary data through multiple MapReduce program no output

62

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Code Skeleton Mappers k2,v2

Multiple files Line numbers

To program

Sort Shuffle Aggregate

Reducers

k3,v3

list(k3,v3)

{

k1,v1

{

list(k1,v1)

To program

Multiple files

public class MyMapReduce { public static class Map extends MapReduceBase implements Mapper { public void map(K1 key, V1 value, OutputCollector output, Reporter reporter) throws IOException { } }

Genericit Interfaces

public static class Reduce extends MapReduceBase implements Reducer { public void reduce(K2 key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { } }

I

Use output.collect(K,V) for emitting (key,value) pair

63

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Code Skeleton I

OutputCollector is a wrapper for managing output I

I

I

output.collect(key,value)

Reporter is used to send status message along the process (advanced programming) recent stable release update (this code may not work in the future) I I

0.20 still compatible no compatibility in the future I I

I

OutputCollector → Context package structure changes

a main is required!

64

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Code skeleton public class MyMapReduce { public static class Map extends MapReduceBase implements Mapper{ } public static class Reduce extends MapReduceBase implements Reducer { } Has to be coherent public static void main(String[] args) throws Exception { JobConf conf = new JobConf(MyMapReduce.class); conf.setJobName("MyMapReduceName"); conf.setInputFormat(InputFormat.class); conf.setOutputFormat(OutputFormat.class); conf.setOutputKeyClass(K3.class); conf.setOutputValueClass(V3.class);

Define input and output format

Define output key and value type

conf.setMapperClass(Map.class); Define mappers and reducers conf.setReducerClass(Reduce.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); Define the input data and where FileOutputFormat.setOutputPath(conf, new Path(args[1])); to store the output JobClient.runJob(conf); RUN ! }

65

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Code skeleton

I

Output types of the mapper can also be defined I I I

I

by default: same as reducer key: job.setMapOutputKeyClass() value: job.setMapOutputValueClass()

Don’t forget to include packages: I I

old/current API: mapred package new API: mapreduce package New API

Old API import java.io.IOException; import java.util.*; import import import import import

org.apache.hadoop.fs.Path; org.apache.hadoop.conf.*; org.apache.hadoop.io.*; org.apache.hadoop.mapred.*; org.apache.hadoop.util.*;

import java.io.IOException; import java.util.*; import java.util.regex.Pattern; import import import import import import import

org.apache.hadoop.fs.Path; org.apache.hadoop.conf.*; org.apache.hadoop.io.*; org.apache.hadoop.mapreduce.*; org.apache.hadoop.mapreduce.lib.input.*; org.apache.hadoop.mapreduce.lib.output.*; org.apache.hadoop.util.*;

66

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Code skeleton with the New API public class MyMapReduceNewAPI {

genericity: context gets the type for the class

public static class Map extends Mapper { public void map(K1, V1, Context context) throws IOException, InterruptedException {} } genericity: context gets the type for the class public static class Reduce extends Reducer { public void reduce(K2, Iterable values, Context context) throws IOException, InterruptedException {} } context.write(K,V) public static void main(String[] args) throws Exception { JobConf divided between Job and Configuration Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"Couting IP addr with the new API"); job.setJarByClass(MyMapReduceNewAPI.class); job.setInputFormatClass(InputFormat.class); job.setOutputFormatClass(OutputFormat.class); job.setOutputKeyClass(K3.class); job.setOutputValueClass(V3.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //job.submit(); job.waitForCompletion(true); } }

67

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Configuration I

configuration files are defined by XML: PropName PropValue PropDesc

I

JobConf (directly in Job now) / Configuration class : I

I

properties of config files can be set/overridden or read: addRessource(XMLfile), set(name,value), get(name) runtime properties can be set: I

I

setNumMapTasks(n): only a hint, it is derived from the split division (has been removed) → mapred.min.split.size property in mapred-default.xml setNumReduceTasks(n), setNumMapTasks(n)

68

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

IPCount public static class Map extends Mapper { private final static IntWritable one = new IntWritable(1); private LongWritable ip = new LongWritable(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); Pattern regExpPattern = Pattern.compile(","); String items[] = regExpPattern.split(line); ip = new Text(items[3]); context.write(ip, one); } } public static class Reduce extends Reducer { public void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { long sum = 0; for (IntWritable value:values) { sum += value.get(); } context.write(key, new LongWritable(sum)); } }

69

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

IPCount public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"Couting IP addr"); job.setJarByClass(IPCountNewApi.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true);}

70

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Compile and Run 1. compile I I

I

eclipse: automatic build or click terminal: javac -classpath /usr/lib/hadoop-*/ hadoop-*.jar path/IPCount.java or set the classpath

2. create a jar I I

eclipse: click, click and click terminal: jar -cvf IPCount.jar -C DirectoryWithClasses DirectoryInTheJar

3. execute: hadoop jar IPCount.jar YourClassWithMain data out 4. look at the results in out

71

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

JobTracker (MR v1) I I

Web GUI: jobtracker-location:50030 status of the job, tasks (progress, failures)

72

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

JobTracker (MR v2) I

jobtracker split in three components I I I

MR specific tasks like syncrhonization (application master) job reporting: resource manager (location:8088) job history: job history server (location:19888)

73

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

1 Introduction

Outline

2 Hadoop 3 Installation 4 In practice

Environment HDFS MapReduce Programming Customization Streaming 5 Design Patterns 6 Applications 7 Tools

74

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Combiner I

IPCount: collects a lot of 1 → network overload Do a local aggregation I

I

I

Mappers + combiners k2,sublist(v2) k2,v2 k1,v1

{

list(k1,v1)

combiner: local reducer on the same machine doing a map → implements Reducer the results has to be the same with or without instantiating the combiner 6= example with hashtable, the combiner does not wait all results (buffered)

Multiple files Line numbers

I

To program

Sort Shuffle Aggregate

Reducers

k3,v3

{

I

To program

list(k3,v3) Multiple files

a combiner with IPCount →job.setCombinerClass(Reduce.class)

75

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Easy configuration I

GenericOptionsParser I I

I

I

command line argument interpretation configuration: file, namenode (can be remote), jobtracker (can be remote), common files to deploy... custom properties

the Tool interface I I I I

I

handle mix between custom and standard arguments used with Configured to implement basic getter and setter ToolRunner uses GenericOptionParser and Tool the javadoc provides the standard way to program a MapReduce application (new API) implementation of the run method (previously in main)

76

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

I

Easy configuration

old API

public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, MyMapReduce.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormat(InputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(K3.class); job.setOutputValueClass(V3.class); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new MyMapReduce(), args); System.exit(res); }

77

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

I

new API

Easy configuration

public class MyApp extends Configured implements Tool { public int run(String[] args) throws Exception { Configuration conf = this.getConf(); Job job = Job.getInstance(conf,"Couting IP addr with the new API"); job.setJarByClass(IPCountToolRunner.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(InputFormat.class); job.setOutputFormatClass(OutputFormat.class); job.setOutputKeyClass(K3.class); job.setOutputValueClass(V3.class); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { // Let ToolRunner handle generic command-line options int res = ToolRunner.run(new Configuration(), new IPCountToolRunner(), args); System.exit(res); }

78

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Easy configuration I

IPCount with ToolRunner I

I

I I

int res = ToolRunner.run(new Configuration(), new IPCountConfig(), args); change properties with the command line: increase the number of reduce tasks results use the first field of the csv file (an index) as the key for the mapper

79

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Easy configuration I

IPCount with ToolRunner I

I

int res = ToolRunner.run(new Configuration(), new IPCountConfig(), args); change properties with the command line: increase the number of reduce tasks I I

I I

-D mapred.reduce.tasks=10 data out (old) -D mapreduce.job.reduces=10 data out (new)

results use the first field of the csv file (an index) as the key for the mapper

79

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Easy configuration I

IPCount with ToolRunner I

I

int res = ToolRunner.run(new Configuration(), new IPCountConfig(), args); change properties with the command line: increase the number of reduce tasks I I

I I

-D mapred.reduce.tasks=10 data out (old) -D mapreduce.job.reduces=10 data out (new)

results → one file per reducer use the first field of the csv file (an index) as the key for the mapper

79

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Easy configuration I

IPCount with ToolRunner I

I

int res = ToolRunner.run(new Configuration(), new IPCountConfig(), args); change properties with the command line: increase the number of reduce tasks I I

I I

-D mapred.reduce.tasks=10 data out (old) -D mapreduce.job.reduces=10 data out (new)

results → one file per reducer use the first field of the csv file (an index) as the key for the mapper I I

I

I

input as KeyValueTextInputFormat → do the split... ...but use specific types → change the type for the mapper definition (Mapper < Text, Text, LongWritable, IntWritable >) -D key.value.separator.in.input.line= t (old) -D mapreduce.input.keyvaluelinerecordreader.key.value.s 79

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

HDFS access I

I

ToolRunner → recycle easily the code based on current context Repeat tasks and keep all or a recent analytics I I I

by default: no override on files →Avoid user manipulation for preparing the HDFS (rmr) api for HDFS: FilesSystem class

80

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

HDFS access I

I

ToolRunner → recycle easily the code based on current context Repeat tasks and keep all or a recent analytics I I I

by default: no override on files →Avoid user manipulation for preparing the HDFS (rmr) api for HDFS: FilesSystem class

f i n a l FileSystem fs = FileSystem . get ( conf ) ; i f ( fs . exists ( new Path ( args [ 1 ] ) ) ) { fs . delete ( new Path ( args [ 1 ] ) , t r u e ) ; }

80

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Predefined Mappers and Reducers I

Hadoop provides standards Mappers and Reducers I I

I

may have predefined input/output types (use it carefully) examples: TokenCountMapper (split text into keys with a value of one), LongSumReducer (sum all values for one key), identityReducer (directly forward the output of the Mapper)

New API is currently less complete

Old New

81

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming I

Predefined How to: I

Mappers and Reducers

IPcount with predefined mappers/reducers

82

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming I

Predefined How to: I

Mappers and Reducers

IPcount with predefined mappers/reducers I

FieldSelectionMapReduce mapper to split the line using a separator

82

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming I

Predefined How to: I

Mappers and Reducers

IPcount with predefined mappers/reducers I

I

I

FieldSelectionMapReduce mapper to split the line using a separator define the separator: job.set("mapred.data.field.separator",",") define the pairs job.set("map.output.key.value.fields.spec","1:1")

82

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming I

Predefined How to: I

Mappers and Reducers

IPcount with predefined mappers/reducers I

I

I

I

I I

I

I

FieldSelectionMapReduce mapper to split the line using a separator define the separator: job.set("mapred.data.field.separator",",") define the pairs job.set("map.output.key.value.fields.spec","1:1") no useful reducer: LongSumReducer cannot handle Text (defined types outputted by FieldSelectionMapReduce) → define a specific reducer types errors between mapper and reducers ← output of the mapper and reducers are different (default) define types for of the mapper: job.setMapOutputKeyClass(Text.class), job.setMapOutputValueClass(Text.class) difference of the output with different version: different sorting if key output type is Text

82

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Some changes in the new API

83

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Some changes in the new API

I

I

job.getConfiguration().set("mapreduce.fieldse job.getConfiguration().set("mapreduce. fieldsel.map.output.key.value .fields.spec","1:1");

83

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Predefined Mappers and Reducers I

Issues I I

Count the number of packets send by each IP address predefined mappers/reducers + combiner

84

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Predefined Mappers and Reducers I

Issues I I

Count the number of packets send by each IP address predefined mappers/reducers + combiner I I I



mapper and reducers similar to previously error type using the reducer as the combiner → define a specific combiner

Map FieldSelectionMapReduce



Reduce

e

on

Cl



Combiner



84

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Custom Parameters of Map Reduce I

Parameters can be retrieved by mappers and reducers I I

I I

I

get the JobConf object public void configure(JobConf job) is called at the initialization and may be customized with the new API: context.getConfiguration() in map... or at the initialization setup(Context)

Extended IPCount I

apply sampling + compare → #pkts sum for every x flows of each IP address ×x

85

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Custom Parameters of Map Reduce I

Parameters can be retrieved by mappers and reducers I I

I I

I

get the JobConf object public void configure(JobConf job) is called at the initialization and may be customized with the new API: context.getConfiguration() in map... or at the initialization setup(Context)

Extended IPCount I

apply sampling + compare → #pkts sum for every x flows of each IP address ×x I

I I

get the 3rd command line argument and save it in Jobconf/Conf get it within the configure method or with context reducer compute the global sum as the sampled one using the modulo operator (%)

85

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Custom Parameters of Map Reduce

86

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Custom Parameters of Map Reduce p u b l i c s t a t i c c l a s s Reduce { p u b l i c v o i d configure ( JobConf job ) { sampling = Integer . parseInt ( job . get ( ” s a m p l i n g ” ) ) ; } } p u b l i c i n t run ( String [ ] args ) t h r o w s Exception { job . set ( ” s a m p l i n g ” , args [ 2 ] ) ; } //NEW API p u b l i c s t a t i c c l a s s Reduce e x t e n d s Reducer { s t a t i c i n t sampling =10; p r o t e c t e d v o i d setup ( Context context ) t h r o w s IOException ←, InterruptedException { Configuration conf = context . g e t C o n f ig u r a t io n ( ) ; sampling = conf . getInt ( ” c o n f . s a m p l i n g ” , 1 ) ; } p u b l i c i n t run ( String [ ] args ) t h r o w s Exception { Configuration conf = t h i s . getConf ( ) ; conf . setInt ( ” c o n f . s a m p l i n g ” , Integer . parseInt ( args [ 2 ] ) ←) ;}}

86

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

1 Introduction

Outline

2 Hadoop 3 Installation 4 In practice

Environment HDFS MapReduce Programming Customization Streaming 5 Design Patterns 6 Applications 7 Tools

87

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

If you don’t like Java... I

I

Hadoop is developed in Java but MapReduce programming may use other languages Streaming I

I

I

use standard I/O: STDIN, STDOUT as input and output of the mapper and reducer every language can be used but has to be installed on all the cluster prior a dedicated jar hadoop-streaming-0.20.2-cdh3u0.jar:

hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-*.jar -help Usage: hadoop hadoop-streaming.jar [options] Options: -input DFS input file(s) for the Map step -output DFS output directory for the Reduce step -mapper The streaming command to run -combiner The streaming command to run -reducer The streaming command to run

88

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Inputs and Outputs I I

Text records as: key tab value (value may be empty) Generic options are supported → custom separator Mapper: I I

I

input: each line of the input data are outputted to STDIN output: each line of STDOUT

Reducer: I

I

input: output of mappers sorted per keys are outputted per line to STDIN output: each line of STDOUT is written in the output file Mappers k1,v1

k2,v2

{

list(k1,v1) Multiple files Line numbers

To program

Sort Shuffle Aggregate

Reducers k2,v2; k2,v2;...

k3,v3

{

I

To program

list(k3,v3) Multiple files

89

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

IPCount I

IPCount using streaming capabilities + your preferred language

90

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

IPCount I

IPCount using streaming capabilities + your preferred language I

the script is not present on the remote computer → error

90

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

IPCount I

IPCount using streaming capabilities + your preferred language I I

the script is not present on the remote computer → error deploy the script on the cluster: option -files

90

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

IPCount I

IPCount using streaming capabilities + your preferred language I I I I

the script is not present on the remote computer → error deploy the script on the cluster: option -files the mapper has to do the key/value split on its own different separators can be used: the output of the mapper is translated for the reducer (default is tab)

90

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

IPCount I

IPCount using streaming capabilities + your preferred language I I I I

I

the script is not present on the remote computer → error deploy the script on the cluster: option -files the mapper has to do the key/value split on its own different separators can be used: the output of the mapper is translated for the reducer (default is tab) order of parameters!! (1st generic hadoop options (-D, -files), then streaming args))

hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -D stream.map.output.field.separator=, -files=’training/src/IPCount.py’ -input flows1000.csv -output streaming2 -mapper ’IPCount.py map’ -reducer ’IPCount.py reduce’

90

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

Python code def map(): for line in sys.stdin: tab = line.split(",") print tab[1],",",1 def reduce(): last_seen = None total = 0 for line in sys.stdin: tab = line.split("\t") if last_seen == None or tab[0] != last_seen: if last_seen !=None: print last_seen,":",total last_seen = tab[0] total = 0 total+= 1 print tab[0],":",total if __name__ == "__main__": if sys.argv[1] == "map": map() else: reduce()

91

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

IPCount I

IPCount using streaming capabilities without scripts

92

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

IPCount I

IPCount using streaming capabilities without scripts I

use the Unix commands I I

map output values to reducer may be empty cut + uniq

hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -input data -output out8 -mapper ’cut -d , -f 2’ -reducer ’uniq -c

92

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

IPCount I

IPCount using streaming capabilities without scripts I

use the Unix commands I I

map output values to reducer may be empty cut + uniq

hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -input data -output out8 -mapper ’cut -d , -f 2’ -reducer ’uniq -c I

the aggregate package I

the mapper has to specify the function to apply: output=function:key tab value

92

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Environment HDFS MapReduce Programming Customization Streaming

IPCount I

IPCount using streaming capabilities without scripts I

use the Unix commands I I

map output values to reducer may be empty cut + uniq

hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -input data -output out8 -mapper ’cut -d , -f 2’ -reducer ’uniq -c I

the aggregate package I

the mapper has to specify the function to apply: output=function:key tab value

hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -files ’IPCountMapperAggregate.py’ -input data -output out9 -mapper ’IPCountMapperAggregate.py’ -reducer aggregate

92

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Outline 1 2 3 4 5 6 7

Introduction Hadoop Installation In practice Design Patterns Applications Tools

93

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Outline 1 Introduction 2 Hadoop 3 Installation 4 In practice 5 Design Patterns

Aggregation Job chaining Joining data sources 6 Applications 7 Tools

94

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

The pattern I I

I

Similar to a GROUP-BY in SQL Apply an aggregate function on all grouped records (sum, average, min, max...) Easy!

95

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

The pattern I I

I

Similar to a GROUP-BY in SQL Apply an aggregate function on all grouped records (sum, average, min, max...) Easy! I I

IPCount or Wordcount are of this type Mapper output = a key as the field to use for the group by + value as the data to aggregate

95

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Multiple aggregation I I I I

I

Do multiple aggregation Use a combiner for optimization → the combiner output will be aggregated results Avoid using Text and parsing (except for the initial reading) keys and values = structured data I I

use string + parsing → bad design use predefined types for values + custom types for keys

96

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Custom types definition I

I

Exercise: get the min,max and average number of bytes per source IP address (use the same COmbiner as Reducer) Recall: specific interface to implement I I

I I

key: WritableComparable value: Writable

+ default constructor without parameter for reflection Define toString() for customizing the output

static

c l a s s MinMaxSum i m p l e m e n t s Writable {

p r i v a t e d o u b l e min ; p r i v a t e d o u b l e max ; p r i v a t e d o u b l e sum ; p u b l i c MinMaxSum ( ) {} p u b l i c v o i d readFields ( DataInput arg0 ) t h r o w s IOException {} p u b l i c v o i d write ( DataOutput arg0 ) t h r o w s IOException { }

97

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Custom types definition I

compareTo(Object o) I

I

I

necessary and important function for WritabelCompare (key) used when during the sort phase for preparing input of reducers usual way to define it: compare individual fields

p u b l i c i n t compareTo ( MyCustomType o ) { // J u s t d e s i g n t e s t t o f i t y o u r n e e d s i f . . . r e t u r n −1; i f . . . return 1; else return 0; }

98

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

The average I

Exercise: compute the average number of bytes per IP address I

still with the same reducer as the Combiner to avoid sending all individual records to the reducers

99

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

The average I

Exercise: compute the average number of bytes per IP address I

I

still with the same reducer as the Combiner to avoid sending all individual records to the reducers average is not associative → cannot compute a local average at the reducer

99

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

The average I

Exercise: compute the average number of bytes per IP address I

I

I

I

still with the same reducer as the Combiner to avoid sending all individual records to the reducers average is not associative → cannot compute a local average at the reducer construct a local average but keep a way to reconstruct the overall sum afterwards create a specifc type with a 2-tuple as a value: average, numer of elements

99

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

The mean I

Exercise: compute the mean of number of packets per IP address I I

need a specifc combine 6= reducer but still a combiner to avoid sending all data

100

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

The mean I

Exercise: compute the mean of number of packets per IP address I I I I

need a specifc combine 6= reducer but still a combiner to avoid sending all data compress list of calues into a list of 2-tupes a custom type for values: (number of packets, number of instances)

100

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

The mean I

Exercise: compute the mean of number of packets per IP address I I I I

I

need a specifc combine 6= reducer but still a combiner to avoid sending all data compress list of calues into a list of 2-tupes a custom type for values: (number of packets, number of instances) the combiner an also pre-sort he data

100

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

A multiple key I

Exercise: compute the mean of number of packets per source and destination IP address

101

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

A multiple key I

I I I

Exercise: compute the mean of number of packets per source and destination IP address use WritableComparable to create a custom type compareTo + the hashCode function public int hashCode() → you can use the hashCode functions of standard Java types to create yours

101

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Outline 1 Introduction 2 Hadoop 3 Installation 4 In practice 5 Design Patterns

Aggregation Job chaining Joining data sources 6 Applications 7 Tools 102

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Simple chaining I

I I

Sequential run of Map Reduce application (easiest design) Worklow: Map1 - Reduce1 - Map2 - Reduce2... One global application: I

I I

I

job.waitForCompletion(true) waits the end of job before returning → sequential call to job.waitForCompletion(true) the returned value can be tested before continuing

Network monitoring use case I I

I

#pkts count per IP address user defines a file with IP addresses related to potential risks: spam, botnet, DoS the #pkts are summed per risk 103

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Simple chaining I I

I

First Map Reduce as previous Out file of the 1st Map Reduce = In file of the second Map Reduce = temporary file → delete at the end using FileSystem Second Map Reduce I I I

share a common knowledge DistributedCache a XML configuration file: I I I

conf.addResource(new Path(args[2])) use context in mapper for retrieving the properties one reducer per risk

104

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Simple chaining (Old Api)

public static class Reduce {

Config file

IP_botnet 3709183202,3709183222,3709183228 adresses of botnet ...

public void reduce() throws IOException { //sum of pkts per IP address } public static class MapperAttacker { public void configure(JobConf job) { //Read the config file with suspect IP addresses }

public void map() throws IOException { for each IP address, if it s a suspect one, add packets to the corresponding attack } } public int run(String[] args) throws Exception {

Job 1

JobConf job = new JobConf(conf, IPCountPktsAttacksChain.class); job.setMapperClass(FieldSelectionMapReduce.class); job.setReducerClass(Reduce.class); FileOutputFormat.setOutputPath(job, new Path(myTempPath)); JobClient.runJob(job);

Additional properties for the job configuration

Chaining

JobConf job2 = new JobConf(conf, IPCountPktsAttacksChain.class); job2.addResource(new Path(args[2])); FileInputFormat.setInputPaths(job, new Path(myTempPath));

Job 2 Predefined Mapper and Reducer

job2.setMapperClass(MapperAttacker.class); job2.setReducerClass(LongSumReducer.class); JobClient.runJob(job2); fs.delete(new Path(myTempPath),true); }

HDFS cleaning

105

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Simple chaining



public static class Reduce {

Config file

IP_botnet 3709183202,3709183222,3709183228 adresses of botnet ...

public void reduce() throws IOException { //sum of pkts per IP address } public static class MapperAttacker {

public void map() throws IOException { for each IP address, if it s a suspect one, add packets to the corresponding attack ...context.getConfiguration().get("IP_botnet")... }

Additional properties for the job configuration

} public int run(String[] args) throws Exception {

Job 1

Job job = Job.getInstance(conf,"1st job"); job.setMapperClass(FieldSelectionMapReduce.class); job.setReducerClass(Reduce.class); FileOutputFormat.setOutputPath(job, new Path(myTempPath)); job.waitForCompletion(true);

Chaining

Job 2

conf.addResource(new Path(args[2])); Job job2 = Job.getInstance(conf,"2nd job"); FileInputFormat.setInputPaths(job, new Path(myTempPath));

Predefined Mapper and Reducer

job2.setMapperClass(MapperAttacker.class); job2.setReducerClass(LongSumReducer.class); job2.waitForCompletion(true); fs.delete(new Path(myTempPath),true);

HDFS cleaning

106

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Think about complexity I

what design pattern did you use?

I

Where to aggregate data? I

mapper or reducer ?

107

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Think about complexity I I I

what design pattern did you use? join! Where to aggregate data? I

mapper or reducer ?

107

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Think about complexity I I I

what design pattern did you use? join! Where to aggregate data? I I I

mapper or reducer ? mapper → reduce reducer input ... but maybe a pre-aggregation (sum) is better → depends on the data size (avoid that all spam related IP address are summed by a unique reducer)

107

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Dependent jobs (Old Api) I

I

I

I

I

I

Multiple jobs with dependency → JobControl no direct execution as Map Reduce 1 JobClient.runJob(JobConf) needs to create Job from Map Reduce 1 JobConf: new Job(JobConf) Complex chaining add jobs: JobControl.addJob(Job) add dependency: job2.addDependingJob(job1)

Simple chaining Map Reduce 2

Map Reduce 3

Map Reduce 2

Map Reduce 6

Map Reduce 3 Map Reduce 5 Map Reduce 4

Parralel runs

JobControl orders the jobs, implements Runnable (thread based design) and can be monitored (running/failed jobs...) 108

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Dependent jobs (New Api) I

I

I

I

I I I

Multiple jobs with dependency → JobControl no direct execution as job.waitForCompletion(true); needs to create ControlledJob to embedd the job: new ControlledJob(conf); cj.setJob(job); create a job controller: = new JobControl(”My JC”); add jobs: myJobControl.addJob(cj);

Simple chaining Map Reduce 1

Map Reduce 2

Map Reduce 3

Map Reduce 1

Map Reduce 2

Map Reduce 6

Complex chaining

Map Reduce 3 Map Reduce 5 Map Reduce 4

Parralel runs

add dependency: cj2.addDependingJob(cj1); JobControl orders the jobs, implements Runnable (thread based design → start it) and can be monitored (running/failed jobs...) 109

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Dependent jobs I

Example I

I

I

I

I

count the number of packets per suspect address → one out file per category (spam, DoS, botnets) define a function for setting up the jobs (most of them share the same parameters) monitor the global execution by displaying regularly the names of running jobs single machine: hard to see parallel jobs (increase the number of reduce tasks)

Java (recalls): I

generic type as function parameter: Class mymapper ∼ all mymapper can be any subclasses of Mapper Thread t = new Thread(Runnable); t.start(); 110

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Dependent jobs (Old Api) public static class MapperBotnet{ public void configure(JobConf job) { //Read the config file with IP address of the attacker } public void map() throws IOException { //output bot IP addresses with their packet counts } }

Count Packets

Extract Bot @

Extract Spam @

Extract DoS @

public JobConf defineJobs(..., Class > mymapper){ job2.setMapperClass(mymapper); ControlledJob cj = new ControlledJob(conf); cj.setJob(job2); return cj; }

Extract DoS @

Generic type

public int run(String[] args) throws Exception { Configuration conf = this.getConf(); Job job = Job.getInstance(conf,"init"); ... ControlledJob cj1= new Controlled(conf); cj1.setJob(job) Job job2 = new Job(MapperBotnet.class); ControlledJob cj2 = defineJobs(....,MapperBotnet.class) JobControl control = new JobControl("My detector"); control.addJob(cj1); control.addJob(cj2); cj22.addDependingJob(cj1); Thread t = new Thread(control); while (!control.allFinished()) { //display names of running jobs }} Thread.sleep(2000); }

Create jobs

Create the controller, add jobs and dependencies

controller execution + monitoring every 2s

112

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Dependent jobs hadoop jar IPCount.jar IPCountPktsAttacksChain2 -D mapred.reduce.tasks=4 data attacker.xml botnet dos spam 11/05/28 16:07:59 INFO mapred.FileInputFormat: Total input paths to process : 1 *** Running jobs: 1st job ... 11/05/28 16:08:41 INFO mapred.FileInputFormat: Total input paths to process : 4 *** Running jobs: dos botnet spam ... *** Running jobs: botnet spam ... *** Running jobs: spam

113

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Outline 1 Introduction 2 Hadoop 3 Installation 4 In practice 5 Design Patterns

Aggregation Job chaining Joining data sources 6 Applications 7 Tools 114

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

The Problem I I I

Join two sources of data regarding a shared key Classical process in data analytics easy with SQL

SELECT user.id, user.name, city.name FROM user, city [INNER|LEFT|RIGHT] JOIN comments ON user.ziocode = city.zipcode

I

Several types I I

I I

inner join (usual case): shared key in both tables outter join: left/right/full (all record of left/right/both tables even if no shared key) antijoin: only records with no match caretesion product: all pair-wise records 115

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Questions

I I

Complexity: memory usage vs. network usage Where to do the join? Map or Reduce side ?

116

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Example A simple example to extract flow records containing suspect IP addresses I I I

example: extract Netflow records of suspect hosts data sources: netflow records, list of suspect IP addresses shared key: the source IP address

Flows.csv 888,3752921443,2463760020,14,1623,1222173626,357,1222173630,810,22,2295,27,6 889,3752919153,2463760020,14,1623,1222173626,359,1222173630,982,22,3054,27,6 890,3752920950,2463760020,14,1631,1222173626,361,1222173630,830,22,4669,27,6 891,3752921425,2463760020,16,1739,1222173626,364,1222173630,662,22,1715,27,6 892,3752921479,2463760020,14,1623,1222173626,364,1222173630,947,22,1894,27,6 893,3752920950,2463760020,16,1747,1222173626,365,1222173630,794,22,4929,27,6 894,2463760020,3752921531,12,1152,1222173627,237,1222173630,824,1048,22,27,6 895,2463760020,3752921533,12,1152,1222173627,288,1222173630,939,1413,22,27,6 896,3752921531,2463760020,14,2317,1222173627,475,1222173630,824,22,1048,27,6 897,3752921533,2463760020,13,2259,1222173627,521,1222173630,939,22,1413,27,6 898,2463760020,3752980554,13,1204,1222173627,965,1222173630,399,2399,22,27,6 899,2463760020,3752980558,14,1256,1222173628,54,1222173630,418,1966,22,27,6

}

Join

}

I

Suspects.txt 3752920950 3752921533

890,3752920950,2463760020,14,1631,1222173626,361,1222173630,830,22,4669,27,6 893,3752920950,2463760020,16,1747,1222173626,365,1222173630,794,22,4929,27,6 897,3752921533,2463760020,13,2259,1222173627,521,1222173630,939,22,1413,27,6

117

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Map-side join I

I

I

Join job may be done by the mappers → extra operations for guaranteeing the accessibility of the data Job design: usual join between a big file and multiple small others → the small files can be put on every machines for the mapper (no huge memory overhead) use the setup function to read the small file (at the instantiation of the Mapper) I

I

I I

protected void setup(Context context) throws IOException, InterruptedException We already did it by using a XML configuration file with specific properties Not really design for that, need to parse → distribute the small file to all mappers 118

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

DistributedCache I

DistributedCache I I I

I

I

distribute data on all machines addCacheFile(): add an HDFS URI to the cache getLocalCacheFiles: get the local cache file paths (the file is only duplicated locally when required) standard Java I/O: BufferedReader, FileReader

but deprecated now → use directly addCacheFile() from Job + getCacheFiles() from Context

119

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

DistributedCache (Deprecated) I

Example I I

create a file with suspect IP addresses reducer flows only for suspect IP addresses

120

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

DistributedCache (Deprecated) I

Example I I

create a file with suspect IP addresses + put on the hdfs reducer flows only for suspect IP addresses

public static class Map { Vector suspects = new Vector(); public void configure(JobConf job) { try { suspectIP = DistributedCache.getLocalCacheFiles(job)[0]; BufferedReader reader =new BufferedReader(new FileReader(suspectIP.toString())); } catch (IOException e) { e.printStackTrace(); } } public void map() throws IOException { //iterate over suspects for comparing }}

3709183202 3709183222 3709183228 3709183307 3709183325 3709184105 3709184106 3709184107 3709184108 3709184109 3709185685 3709186765 3709187367 3709187461 3709187931 3709188053

public int run(String[] args) throws Exception { Configuration conf = getConf(); DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);

hdfs dfs -put

}

120

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

I

Example I I

DistributedCache

create a file with suspect IP addresses reducer flows only for suspect IP addresses

121

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Example

I

I I

DistributedCache

create a file with suspect IP addresses + put on the hdfs reducer flows only for suspect IP addresses

public static class Map { static Vector suspects = new Vector(); protected void setup(Context context) throws IOException, InterruptedException { URI suspectIP = context.getCacheFiles()[0]; BufferedReader reader =new BufferedReader(new FileReader(suspectIP.getPath())); //construct suspects } public void map() throws IOException, InterruptedException { for(String s:suspects){ if ..... } }

3709183202 3709183222 3709183228 3709183307 3709183325 3709184105 3709184106 3709184107 3709184108 3709184109 3709185685 3709186765 3709187367 3709187461 3709187931 3709188053

public int run(String[] args) throws Exception { job.addCacheFile(new Path(args[2]).toUri());

hdfs dfs -put

}

121

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Alternative I

Exercise: I

is it possible to read the file in the map function?

I

what is the difference between this approach and the previous ones in terms of I/O and memory

I

Can you check it experimentally? what is the best approach ?

I

122

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Alternative I

Exercise: I I I

I I I I

is it possible to read the file in the map function? Yes, in the same manner as we have the Context object what is the difference between this approach and the previous ones in terms of I/O and memory 1st one: one read + structure in memory 2nd one: multiple read but does not consume memroy Can you check it experimentally? what is the best approach ?

122

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Alternative I

Exercise: I I I

I I I I I

is it possible to read the file in the map function? Yes, in the same manner as we have the Context object what is the difference between this approach and the previous ones in terms of I/O and memory 1st one: one read + structure in memory 2nd one: multiple read but does not consume memroy Can you check it experimentally? what is the best approach ? dependent on the scenario

122

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Alternative I

Exercise: I I I

I I I I I I

is it possible to read the file in the map function? Yes, in the same manner as we have the Context object what is the difference between this approach and the previous ones in terms of I/O and memory 1st one: one read + structure in memory 2nd one: multiple read but does not consume memroy Can you check it experimentally? what is the best approach ? dependent on the scenario small size → setup

122

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Alternative I

Exercise: I I I

I I I I I I I

is it possible to read the file in the map function? Yes, in the same manner as we have the Context object what is the difference between this approach and the previous ones in terms of I/O and memory 1st one: one read + structure in memory 2nd one: multiple read but does not consume memroy Can you check it experimentally? what is the best approach ? dependent on the scenario small size → setup medium size → map or reduce

122

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Join + Aggregate I

Exercise: count packets per IP address only for suspect IP addresses I

1st approach with 2 separate jobs

I

Do both join and aggregation in a single MapReduce job based with a map-side join

I

Where the aggregation is done? What about performances?

I

Can we do it differently and what are the difference in terms of performances.

123

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Join + Aggregate I

Exercise: count packets per IP address only for suspect IP addresses I I

1st approach with 2 separate jobs use chaining

I

Do both join and aggregation in a single MapReduce job based with a map-side join

I

Where the aggregation is done? What about performances?

I

Can we do it differently and what are the difference in terms of performances.

123

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Join + Aggregate I

Exercise: count packets per IP address only for suspect IP addresses I I I I

1st approach with 2 separate jobs use chaining poor performance Do both join and aggregation in a single MapReduce job based with a map-side join

I

Where the aggregation is done? What about performances?

I

Can we do it differently and what are the difference in terms of performances.

123

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Join + Aggregate I

Exercise: count packets per IP address only for suspect IP addresses I I I I

I I

I

1st approach with 2 separate jobs use chaining poor performance Do both join and aggregation in a single MapReduce job based with a map-side join use a combiner + reducer for aggregation Where the aggregation is done? What about performances? Can we do it differently and what are the difference in terms of performances.

123

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Join + Aggregate I

Exercise: count packets per IP address only for suspect IP addresses I I I I

I I

I I

1st approach with 2 separate jobs use chaining poor performance Do both join and aggregation in a single MapReduce job based with a map-side join use a combiner + reducer for aggregation Where the aggregation is done? What about performances? do aggregation before join → less data for join operation Can we do it differently and what are the difference in terms of performances.

123

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Join + Aggregate I

Exercise: count packets per IP address only for suspect IP addresses I I I I

I I

I I

I

1st approach with 2 separate jobs use chaining poor performance Do both join and aggregation in a single MapReduce job based with a map-side join use a combiner + reducer for aggregation Where the aggregation is done? What about performances? do aggregation before join → less data for join operation Can we do it differently and what are the difference in terms of performances. do aggregation at reduce side (with a cached file 123

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Join + Aggregate I

Exercise: count packets per IP address only for suspect IP addresses I I I I

I I

I I

I I

1st approach with 2 separate jobs use chaining poor performance Do both join and aggregation in a single MapReduce job based with a map-side join use a combiner + reducer for aggregation Where the aggregation is done? What about performances? do aggregation before join → less data for join operation Can we do it differently and what are the difference in terms of performances. do aggregation at reduce side (with a cached file numerous records during shuffle → network consuming 123

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Join + Aggregate I

Exercise: count packets per IP address only for suspect IP addresses I I I I

I I

I I

I I I

1st approach with 2 separate jobs use chaining poor performance Do both join and aggregation in a single MapReduce job based with a map-side join use a combiner + reducer for aggregation Where the aggregation is done? What about performances? do aggregation before join → less data for join operation Can we do it differently and what are the difference in terms of performances. do aggregation at reduce side (with a cached file numerous records during shuffle → network consuming numerous records during shuffle

123

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Reduce-side join I

Example I I

create a file with suspect IP addresses count packets per IP address + reducer to get values only for suspect IP addresses

124

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Reduce-side join I

Example I I

create a file with suspect IP addresses + put on the hdfs count packets per IP address + reducer to get values only for suspect IP addresses

public static class Map { static Path suspectIP; public void configure(JobConf job) { try { suspectIP = DistributedCache.getLocalCacheFiles(job)[0]; } catch (IOException e) { e.printStackTrace(); } } public void map() throws IOException { BufferedReader reader =new BufferedReader(new FileReader(suspectIP.toString())); } }

3709183202 3709183222 3709183228 3709183307 3709183325 3709184105 3709184106 3709184107 3709184108 3709184109 3709185685 3709186765 3709187367 3709187461 3709187931 3709188053

public int run(String[] args) throws Exception { Configuration conf = getConf(); DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf); }

hdfs dfs -put

124

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Join between large files I I

Previous examples → one file is small Higher issue: join two large files I

I

I

I

none of them cannot be fully copied to each mapper or reducer all records that may join should be sent to a single reducer to join them the records that have to be join share a common key (e.g. the IP address) = input key of reducer one mapper per file I I

I

key = shared key value = specific “columns” to be included in the join

Any problem?

125

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Join between large files I I

Previous examples → one file is small Higher issue: join two large files I

I

I

I

none of them cannot be fully copied to each mapper or reducer all records that may join should be sent to a single reducer to join them the records that have to be join share a common key (e.g. the IP address) = input key of reducer one mapper per file I I

I

key = shared key value = specific “columns” to be included in the join

Any problem? I

multiple records for a single key from a single reduce → join over the same file!

125

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Join between large files I I

Previous examples → one file is small Higher issue: join two large files I

I

I

I

none of them cannot be fully copied to each mapper or reducer all records that may join should be sent to a single reducer to join them the records that have to be join share a common key (e.g. the IP address) = input key of reducer one mapper per file I I

I

key = shared key value = specific “columns” to be included in the join

Any problem? I

I

multiple records for a single key from a single reduce → join over the same file! prefix the value with the origin of the data → avoid to join records from the same file 125

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Example I

Change a bit the exercise: I

I

the suspects file contains IP addresses (1st field) and the type of threat (2nd field) the result is a list of suspect flows with the type of threat indicated at the end of the line

Flows.csv 3752919153,2463760020 3752920950,2463760020 3752921479,2463760020 3752920950,2463760020 3752921533,2463760020

Mappers 3752919153 flows 2463760020 3752920950 flows 2463760020 3752921479 flows 2463760020 3752920950 flows 2463760020 3752921533 flows 2463760020

Suspects.txt 3752920950,bot 3752921533,dos

3752920950 addr bot 3752921533 addr dos

Key

Label Value

{

Reducers

3752919153 flows, 3752919153, 2463760020 3752920950 flows, 3752920950, 2463760020 flows, 3752920950, 2463760020 addr, 3752920950 bot

{

3752921479 flows, 3752921479, 2463760020 3752921533 flows, 3752921533, 2463760020 addr, 3752921533 dos

126

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

The reduce side I

How to get the join I

reducer: I I

cross-product on each key with same label at once combine() function: final output applied on each combination of reduce() → example: copy the record if source IP address is in the suspect host list

3752919153 flows, 3752919153, 2463760020 3752920950 flows, 3752920950, 2463760021 flows, 3752920950, 2463760023 addr, 3752920950, bot

reduce

combine

flows, 3752919153, 2463760020 flows, 3752920950, 2463760021 addr, 3752920950

3752920950, 2463760021

flows, 3752920950, 2463760023 addr, 3752920950, bot

3752920950, 2463760023, bot

127

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Implementation - values I I

Avoid packing the label in a Text → create your own type with an integer as the label I I

Serialization with String ! → how many bytes to read? to start: keep only one field like packet number

128

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Implementation - values I I

Avoid packing the label in a Text → create your own type with an integer as the label I I I I

Serialization with String ! → how many bytes to read? to start: keep only one field like packet number Create a sub-class of Text makes thing easy Do not forget the empty constructor for reflexion

128

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Implementation - values I I

Avoid packing the label in a Text → create your own type with an integer as the label I I I I I

Serialization with String ! → how many bytes to read? to start: keep only one field like packet number Create a sub-class of Text makes thing easy Do not forget the empty constructor for reflexion the best: create a composite type with all fields of the flows

128

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Implementation - values I I

Avoid packing the label in a Text → create your own type with an integer as the label I I I I I

Serialization with String ! → how many bytes to read? to start: keep only one field like packet number Create a sub-class of Text makes thing easy Do not forget the empty constructor for reflexion the best: create a composite type with all fields of the flows

p u b l i c c l a s s SuspectsFlows { p r i v a t e f i n a l s t a t i c i n t labelFlows = 0 ; private final static int labelSuspects = 1 ; p u b l i c s t a t i c c l a s s LabelText e x t e n d s Text { private int label ; p u b l i c LabelText ( ) { super () ; } p u b l i c LabelText ( String s , i n t l a b e l ) { . . . } p u b l i c v o i d readFields ( DataInput arg0 ) t h r o w s IOException ←{...} p u b l i c v o i d write ( DataOutput arg0 ) t h r o w s IOException ←128 {...}

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Implementation - Mappers I I

2 files with different formats → 2 mappers I

I

nothing special... but be careful about types

But how to assign a mapper to a specific files... and how to use multiple files as input I

MultipleInputs

129

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Implementation - Mappers I I

2 files with different formats → 2 mappers I I

I

nothing special... but be careful about types The mappers will output to the same reducer

But how to assign a mapper to a specific files... and how to use multiple files as input I

MultipleInputs

129

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Implementation - Mappers I I

2 files with different formats → 2 mappers I I I

I

nothing special... but be careful about types The mappers will output to the same reducer Mapper out types have to be the same

But how to assign a mapper to a specific files... and how to use multiple files as input I

MultipleInputs

129

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Implementation - Mappers I I

2 files with different formats → 2 mappers I I I I

I

nothing special... but be careful about types The mappers will output to the same reducer Mapper out types have to be the same Emit the right label in the value (different for each mapper

But how to assign a mapper to a specific files... and how to use multiple files as input I

MultipleInputs

129

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Implementation - Mappers I I

2 files with different formats → 2 mappers I I I I I

I

nothing special... but be careful about types The mappers will output to the same reducer Mapper out types have to be the same Emit the right label in the value (different for each mapper the best: create a parent class and sub-classes

But how to assign a mapper to a specific files... and how to use multiple files as input I

MultipleInputs

129

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Implementation - Main I

Specify multiple input files and the associated mappers

i m p o r t org . apache . hadoop . mapreduce . lib . input . ∗ ; Mult ipleInpu ts . addInputPath ( job , new Path ( args [ 0 ] ) , ←T ex tI np u tF or ma t . c l a s s , MapFlows . c l a s s ) ; Mult ipleInpu ts . addInputPath ( job , new Path ( args [ 1 ] ) , ←T ex tI np u tF or ma t . c l a s s , MapSuspects . c l a s s ) ; I

The same exists for output file I I I

multiple reducers every reducer receive the same data → useful for applying parallel reducers on the same map output 130

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Implementation - Reducer I

The reducer has to do the cross product I

I I

→ merge only values coming from two different mappers (or even more)

use NullWritable if you don’t need a key or value program also left outer join and right outer join

p u b l i c v o i d reduce ( LongWritable key , Iterable values , ←Context context ) t h r o w s IOException , I n t e r r u p t e d E x c e p t i o n { Vector v1 = new Vector() ; Vector v2 = new Vector() ; f o r ( LabelText val : values ) { i f ( val . getLabel ( ) == labelFlows ) { v1 . add ( val . toString ( ) ) ; } else { ... f o r ( i n t i = 0 ; i < v1 . size ( ) ; i++) { ... context . write ( NullWritable . get ( ) , . . . } 131

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Implementation - Reducer I

The reducer has to do the cross product I

I

I I

→ merge only values coming from two different mappers (or even more) retrieve the label contained in the value use NullWritable if you don’t need a key or value program also left outer join and right outer join

p u b l i c v o i d reduce ( LongWritable key , Iterable values , ←Context context ) t h r o w s IOException , I n t e r r u p t e d E x c e p t i o n { Vector v1 = new Vector() ; Vector v2 = new Vector() ; f o r ( LabelText val : values ) { i f ( val . getLabel ( ) == labelFlows ) { v1 . add ( val . toString ( ) ) ; } else { ... f o r ( i n t i = 0 ; i < v1 . size ( ) ; i++) { ... context . write ( NullWritable . get ( ) , . . . } 131

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Aggregation Job chaining Joining data sources

Implementation - Reducer I

The reducer has to do the cross product I

I I I I

→ merge only values coming from two different mappers (or even more) retrieve the label contained in the value avoid duplicated results = avoid iterating over n × n) use NullWritable if you don’t need a key or value program also left outer join and right outer join

p u b l i c v o i d reduce ( LongWritable key , Iterable values , ←Context context ) t h r o w s IOException , I n t e r r u p t e d E x c e p t i o n { Vector v1 = new Vector() ; Vector v2 = new Vector() ; f o r ( LabelText val : values ) { i f ( val . getLabel ( ) == labelFlows ) { v1 . add ( val . toString ( ) ) ; } else { ... f o r ( i n t i = 0 ; i < v1 . size ( ) ; i++) { ... context . write ( NullWritable . get ( ) , . . . } 131

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Outline 1 2 3 4 5 6 7

Introduction Hadoop Installation In practice Design Patterns Applications Tools 132

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Outline 1 Introduction 2 Hadoop 3 Installation 4 In practice 5 Design Patterns 6 Applications

Matrix computations Traffic Monitor Topology 7 Tools 133

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Problem description I

Matrices and vectors used in many applications in big data I

I I

I

data mining, machine learning... → useful for data analytics

different operations: addition, substraction, product,... → use mapreduce to split such operations in smaller ones that are processed in parallel → what to carefully think about I

I I

how to represent matrix and vector data to be processed easily by hadoop? correlation between values of the input and output? what will be the keys and values during map reduce? 134

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Data representation I

Vectors and matrices have to be handled and parsed I

I

two types of files → the mapper should know what kind of data is in use

Values in the vector I

I

each value is represented by an index

Values in the matrices I

each row is indexed with multiple columns

135

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Data representation I

Vectors and matrices have to be handled and parsed I

two types of files → the mapper should know what kind of data is in use I

I

Values in the vector I

I

two files + mapper has to check the filename is use (possible)

each value is represented by an index

Values in the matrices I

each row is indexed with multiple columns

135

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Data representation I

Vectors and matrices have to be handled and parsed I

two types of files → the mapper should know what kind of data is in use I I

I

Values in the vector I

I

two files + mapper has to check the filename is use (possible) easiest way: use a single file + prefix line with the corresponding “types” of data each value is represented by an index

Values in the matrices I

each row is indexed with multiple columns

135

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Data representation I

Vectors and matrices have to be handled and parsed I

two types of files → the mapper should know what kind of data is in use I I

I

Values in the vector I I

I

two files + mapper has to check the filename is use (possible) easiest way: use a single file + prefix line with the corresponding “types” of data each value is represented by an index one value per line as index + value

Values in the matrices I

each row is indexed with multiple columns

135

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Data representation I

Vectors and matrices have to be handled and parsed I

two types of files → the mapper should know what kind of data is in use I I

I

Values in the vector I I

I

two files + mapper has to check the filename is use (possible) easiest way: use a single file + prefix line with the corresponding “types” of data each value is represented by an index one value per line as index + value

Values in the matrices I I

each row is indexed with multiple columns one value per line as index + column1 + column2 +...

135

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Data representation I

Vectors and matrices have to be handled and parsed I

two types of files → the mapper should know what kind of data is in use I I

I

Values in the vector I I

I

two files + mapper has to check the filename is use (possible) easiest way: use a single file + prefix line with the corresponding “types” of data each value is represented by an index one value per line as index + value

Values in the matrices I I I

each row is indexed with multiple columns one value per line as index + column1 + column2 +... one value per line as row number + column number + value

135

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Addition 

a11 a12  a21 a22 A= ..  ... . am1 am2



· · · a1n · · · a2n  . . . ...   · · · amn



b11 b12  b21 b22 B = ..  ... . bm1 bm2

 · · · b1n · · · b2n  . . . ...   · · · bmn

A+B =C cij = aij + bij

136

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Addition 

a11 a12  a21 a22 A= ..  ... . am1 am2



· · · a1n · · · a2n  . . . ...   · · · amn



b11 b12  b21 b22 B = ..  ... . bm1 bm2

 · · · b1n · · · b2n  . . . ...   · · · bmn

A+B =C cij = aij + bij →use (i,j) as keys 136

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Pseudo code (addition) I

Pseudo-code of Mappers, map(k,v):

I

Pseudo-code of the reducer, reduce(k,v):

137

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Pseudo code (addition) I

Pseudo-code of Mappers, map(k,v): k,v = line_number, line //parse_line extracts the 3 fields: //i, j, a_ij or b_ij) i,j,val = parse_line(line) emit((i,j),val)

I

Pseudo-code of the reducer, reduce(k,v):

137

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Pseudo code (addition) I

Pseudo-code of Mappers, map(k,v): k,v = line_number, line //parse_line extracts the 3 fields: //i, j, a_ij or b_ij) i,j,val = parse_line(line) emit((i,j),val)

I

Pseudo-code of the reducer, reduce(k,v): k,v = (i,j),list sum = 0 for each val in list: sum + val

137

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Multiplication 

a11 a12  a21 a22 A= ..  ... . am1 am2

 · · · a1n · · · a2n  . . . ...   · · · amn



b11 b12 b21 b22 B = ..  ... . bn1 bn2

 · · · b1p · · · b2p  . . . ...   · · · bnp

A×B =C n X cij = aik × bkj k=1

138

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Multiplication 

a11 a12  a21 a22 A= ..  ... . am1 am2

 · · · a1n · · · a2n  . . . ...   · · · amn



b11 b12 b21 b22 B = ..  ... . bn1 bn2

 · · · b1p · · · b2p  . . . ...   · · · bnp

A×B =C n X cij = aik × bkj k=1 I

→use (i,j) as keys

138

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Multiplication 

a11 a12  a21 a22 A= ..  ... . am1 am2

 · · · a1n · · · a2n  . . . ...   · · · amn



b11 b12 b21 b22 B = ..  ... . bn1 bn2

 · · · b1p · · · b2p  . . . ...   · · · bnp

A×B =C n X cij = aik × bkj k=1 I I

→use (i,j) as keys →each value of A and B will be used for multiple values of C

138

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Multiplication 

a11 a12  a21 a22 A= ..  ... . am1 am2

 · · · a1n · · · a2n  . . . ...   · · · amn



b11 b12 b21 b22 B = ..  ... . bn1 bn2

 · · · b1p · · · b2p  . . . ...   · · · bnp

A×B =C n X cij = aik × bkj k=1 →use (i,j) as keys →each value of A and B will be used for multiple values of C I → the reducer will do the final sum → he has to know the matching between the values and k I

I

138

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Pseudo code (multiplication) I

Pseudo-code of Mappers, map(k,v):

139

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Pseudo code (multiplication) I

Pseudo-code of Mappers, map(k,v): K,V = line_number, line //parse_line extracts the 3 fields: //A or B, i, j, a_ij or b_ij mat,i,j,val = parse_line(line) if mat = A: for k in 1,..,p: emit((i,k),(A,j,val)) end for else (mat = B): for k in 1..m emit((k,j),(B,i,val)) end for end if 139

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Pseudo code (multiplication) I

Pseudo-code of the reducer, reduce(k,v):

140

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Pseudo code (multiplication) I

Pseudo-code of the reducer, reduce(k,v): K,V = (i,j),list dictA = {}, dictB = {} for each (mat,ind,val) in list: if mat = A: dictA[ind] = val else: dictB[ind] = val end if end for sum = 0 for k in 1,..,n: sum = sum + dictA[k] * dictB[k] end for emit ((i,j),sum) 140

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Issues I

Does it handle sparse matrices which are very usual in Big Data?

I

What about n, m and p?

I

What about performances?

141

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Issues I

Does it handle sparse matrices which are very usual in Big Data? I

No

I

What about n, m and p?

I

What about performances?

141

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Issues I

Does it handle sparse matrices which are very usual in Big Data? I I

No solution: add additional tests in the reducer before getting values in hash tables

I

What about n, m and p?

I

What about performances?

141

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Issues I

Does it handle sparse matrices which are very usual in Big Data? I I

I

What about n, m and p? I

I

No solution: add additional tests in the reducer before getting values in hash tables they are not known in advance by the mappers and reducers

What about performances?

141

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Issues I

Does it handle sparse matrices which are very usual in Big Data? I I

I

What about n, m and p? I I

I

No solution: add additional tests in the reducer before getting values in hash tables they are not known in advance by the mappers and reducers put them in code or as parameter

What about performances?

141

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Issues I

Does it handle sparse matrices which are very usual in Big Data? I I

I

What about n, m and p? I I I

I

No solution: add additional tests in the reducer before getting values in hash tables they are not known in advance by the mappers and reducers put them in code or as parameter in many cases, the user does not know as well → additional MR job to define them

What about performances?

141

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Issues I

Does it handle sparse matrices which are very usual in Big Data? I I

I

What about n, m and p? I I I

I

No solution: add additional tests in the reducer before getting values in hash tables they are not known in advance by the mappers and reducers put them in code or as parameter in many cases, the user does not know as well → additional MR job to define them

What about performances? I

each reducer has a memory copy of a single column and row → these might be (too) big 141

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

New version I

I

By reading A, impossible to know what will be the number of column in B By reading B, impossible to know what will be the number of row in A

142

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

New version I

I

I

By reading A, impossible to know what will be the number of column in B By reading B, impossible to know what will be the number of row in A when parsing each matrix, do not rely of unknown parameters for computing the map output key

142

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

New version I

I

I

I

By reading A, impossible to know what will be the number of column in B By reading B, impossible to know what will be the number of row in A when parsing each matrix, do not rely of unknown parameters for computing the map output key P in cij = nk=1 aik × bkj , only k is in common

142

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

New version I

I

I

I I

By reading A, impossible to know what will be the number of column in B By reading B, impossible to know what will be the number of row in A when parsing each matrix, do not rely of unknown parameters for computing the map output key P in cij = nk=1 aik × bkj , only k is in common use k as the key

142

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

New version

143

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

New version

143

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

New version

143

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Pseudo code (multiplication) I

Pseudo-code of Mappers, map(k,v): k,v = line_number, line //parse_line extracts the 3 fields: //A or B, i, j, a_ij or b_ij) mat,i,j,val = parse_line(line) if mat = A: emit(j, (A,i,val)) else: emit(i, (B,j,val) end if

144

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Pseudo code (multiplication) I

Pseudo-code of the reducer, reduce(k,v): k,v = (i,j),list listA = [], listB = [] for each (mat,ind,val) in list: if mat = A: listA = listA + (ind,val) else listB = listB + (ind,val) end if end for for (indA,valA) in listA: for (indB,valB) in listB: emit((indA,indB),valA*valB) end for end for 145

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Pseudo code (final MapReduce) I

Pseudo-code of Mappers, map(k,v): k,v = (i,j),val emit (i,j),val // nothing to do, aggregation // will be done by the reducer

I

Pseudo-code of Reducers, reducers(k,v): k,v = (i,j),list sum = 0 for val in list: sum = sum + val emit (i,j),sum

146

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Another version? I

Main issue: storing a column or a row in memory

147

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Another version? I

Main issue: storing a column or a row in memory I

split row and columns

147

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Another version? I

Main issue: storing a column or a row in memory I I

split row and columns partial computation

147

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Another version? I

Main issue: storing a column or a row in memory I I I

I I I I I

split row and columns partial computation block matrix multiplication

rows of A are split in r blocks columns of A are split in blocks of size s rows of B are split in s blocks columns of A are split in blocks of size t division is not necessary regular but it is easier

147

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Multiplication 

A11 A12  A21 A22 A= ..  ... . A mr 1 Am2

  B11 B12 · · · A1 ns  · · · A2 ns   B21 B22 B =  .. .. . . . ...    . . · · · A mr ns B ns 1 Bn2

 · · · B1 pt · · · B2 pt   . . . ...   · · · B ns pt

A×B =C cij =

n/s X

Aik × Bkj

k=1

148

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Multiplication 

A11 A12  A21 A22 A= ..  ... . A mr 1 Am2

  B11 B12 · · · A1 ns  · · · A2 ns   B21 B22 B =  .. .. . . . ...    . . · · · A mr ns B ns 1 Bn2

 · · · B1 pt · · · B2 pt   . . . ...   · · · B ns pt

A×B =C cij =

n/s X

Aik × Bkj

k=1 I

→ sum of matrix products

148

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Multiplication 

A11 A12  A21 A22 A= ..  ... . A mr 1 Am2

  B11 B12 · · · A1 ns  · · · A2 ns   B21 B22 B =  .. .. . . . ...    . . · · · A mr ns B ns 1 Bn2

 · · · B1 pt · · · B2 pt   . . . ...   · · · B ns pt

A×B =C cij =

n/s X

Aik × Bkj

k=1 I I

→ sum of matrix products → similar code when using entire matrices 148

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Multiplication 

A11 A12  A21 A22 A= ..  ... . A mr 1 Am2

  B11 B12 · · · A1 ns  · · · A2 ns   B21 B22 B =  .. .. . . . ...    . . · · · A mr ns B ns 1 Bn2

 · · · B1 pt · · · B2 pt   . . . ...   · · · B ns pt

A×B =C cij =

n/s X

Aik × Bkj

k=1 → sum of matrix products → similar code when using entire matrices I → need a preprocessing to split

I

I

148

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Outline 1 Introduction 2 Hadoop 3 Installation 4 In practice 5 Design Patterns 6 Applications

Matrix computations Traffic Monitor Topology 7 Tools 149

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

A multi-functional tool I

Extension of the previous toy examples I I I I I I

I

count the bandwidth consumption... ...regarding different criteria: per IP address, ports... track the metrics along time: per minute allows the user to define easily the main parameters nice (understandable) output avoid to manipulate strings: create your custom types

user actions example I I I

compute traffic metrics per IP adress and per minute compute traffic metrics per IP adress and per port → optimization: try to reuse prior computations

150

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

A multi-functional tool I I

output example (for port) source and destination information is not differentiated

23-8-2008 23-8-2008 23-8-2008 23-8-2008 .... 23-8-2008 23-8-2008 23-8-2008 .... 23-8-2008 23-8-2008 23-8-2008

2:44 2:44 2:44 2:44

4985 4988 4990 5913

--> --> --> -->

2 pkts / 100 bytes 52 pkts / 6260 bytes 2 pkts / 100 bytes 3 pkts / 120 bytes

2:45 1088 --> 25 pkts / 3395 bytes 2:45 1094 --> 28 pkts / 2931 bytes 2:45 1096 --> 2 pkts / 100 bytes 2:46 1519 --> 28 pkts / 2899 bytes 2:46 1520 --> 26 pkts / 2831 bytes 2:46 1528 --> 25 pkts / 3069 bytes 151

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Map Reduce Workflow I

Aggregate flows per the different criteria: IP address, port, time I

reuse data → aggregate data step by step and keep intermediate results

152

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Map Reduce Workflow I

Aggregate flows per the different criteria: IP address, port, time I

I

reuse data → aggregate data step by step and keep intermediate results

issue: the aggregation has to be done in the right order of granularity in order to reconstruct necessary data afterwards

152

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Map Reduce Workflow I

Aggregate flows per the different crtieria: IP address, port, time I

reuse data → aggregate data step by step and keep intermediate results

153

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Map Reduce Workflow I

Aggregate flows per the different crtieria: IP address, port, time I

I

reuse data → aggregate data step by step and keep intermediate results

reuse previous results at the second run (do not need to read again all flows)

153

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Multiple Map Reduce index, src_ip, dst_ip, src_port, dst_port, #pkts, #bytes, time

@IP, port, time, #pkts, #bytes @IP, port, time, #pkts, #bytes @IP, port, time, #pkts, #bytes

}

@IP, port, time, sum(#pkts), sum(#bytes)

Remove port information

@IP, time, sum(#pkts), sum(#bytes) @IP, time, sum(#pkts), sum(#bytes) Port, time, sum(#pkts), sum(#bytes) Port, time, sum(#pkts), sum(#bytes)

} }

@IP, time, sum(sum(#pkts)), sum(sum(#bytes)) Port, time, sum(sum(#pkts)), sum(sum(#bytes))

Remove @IP information

I

keys and values = structured data I I

use string + parsing → bad design use predefined types for values + custom types for keys 154

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Custom types definition I

Recall: specific interface to implement I I

I I

key: WritableComparable value: Writable

+ default constructor without parameter for reflection Define toString() for customizing the output

static

c l a s s TIP i m p l e m e n t s WritableComparable {

p r i v a t e l o n g ip ; p r i v a t e l o n g port ; p r i v a t e l o n g time ; p u b l i c TIP ( ) {} p u b l i c v o i d readFields ( DataInput arg0 ) t h r o w s IOException {} p u b l i c v o i d write ( DataOutput arg0 ) t h r o w s IOException { } p u b l i c i n t compareTo ( TIP o ) {} p u b l i c String toString ( ) {} }

155

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Custom types definition I

I I

DataInput / DatOurput are used for serialization → order of read and write are important !!! → easy with fixed-length type compareTo(Object o) I I

I

necessary and important function used when during the sort phase for preparing input of reducers usual way to define it: compare individual fields

p u b l i c i n t compareTo ( TIP o ) i f ( ( ( time == o . time ) ? ip − o . ip ) : time − i f ( ( ( time == o . time ) ? ip − o . ip ) : time − return 0; }

{ ( ( ip == o . time ) ( ( ip == o . time )

o . ip ) ? port − o . port : ←< 0 ) r e t u r n −1; o . ip ) ? port − o . port : ←> 0) r e t u r n 1 ;

156

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

First run I

Does it work properly ? I I

yes... and no

157

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

First run I

Does it work properly ? I I

yes... and no → a useful tool should display results sorted per time unlike our tool with multiple reducers

157

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

First run I

Does it work properly ? I I

I

yes... and no → a useful tool should display results sorted per time unlike our tool with multiple reducers

issue: the order is only guaranteed locally I I

global order is not guaranteed... except with one reducer (trivial case) but poor performance

157

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

First run I

Does it work properly ? I I

I

yes... and no → a useful tool should display results sorted per time unlike our tool with multiple reducers

issue: the order is only guaranteed locally I I

global order is not guaranteed... except with one reducer (trivial case) but poor performance Reducers

Sort Shuffle Aggregate

10:00 10:02 10:10 10:03 10:06 10:01 10:05 10:11

Final output not sorted 10:00 10:02 10:10 10:03 10:06 10:01 10:05 10:11

157

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Sort the results I I

Create your custom partitioner Use the TotalOrderPartitioner I I

performance → cannot perform a sort on all keys partition file ∼ sequential split of keys Reducers

Sort Shuffle Aggregate

Reducers

10:00 10:02 10:10

10:00 10:01 10:02

10:03 10:06

10:03 10:05

10:01 10:05 10:11

10:06 10:10 10:11

158

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Partition file

Split the keys to have a good load balancing between reducers I

similar interval in the key space is not efficient use the distribution of keys per number of values → one bottleneck I I

computation on a subset of database InputSampler + subclasses

#values

I

#values

I

k1 k2 k3 k4 k5 k6 k7 k8 Slit the key space with equivalent interval

k1 k2 k3 k4 k5 k6 k7 k8 Slit the key space regarding the distribution keys

159

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

TotalOrderPartitionner

I

Create a partition file

I

Use TotalOrderPartitioner

160

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

I

Create a partition file I

TotalOrderPartitionner

use the RandomSampler

RandomSampler sampler = new InputSampler.RandomSampler(0.05, 15); InputSampler.writePartitionFile(jobConf,sampler); I

a sampler need a file to sample → executed on input data of the job (Identity job may be necessary)

DistributedCache.addCacheFile(new URI("_partition.lst#_link"), conf); DistributedCache.createSymlink(jobConf); I

Use TotalOrderPartitioner

160

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

I

Create a partition file I

TotalOrderPartitionner

use the RandomSampler

RandomSampler sampler = new InputSampler.RandomSampler(0.05, 15); InputSampler.writePartitionFile(jobConf,sampler); I

a sampler need a file to sample → executed on input data of the job (Identity job may be necessary)

DistributedCache.addCacheFile(new URI("_partition.lst#_link"), conf); DistributedCache.createSymlink(jobConf); I

Use TotalOrderPartitioner I I

set the partitioner set the partition file

jobConf.setPartitionerClass(TotalOrderPartitioner.class); Path partitionFile = new Path("_link"); TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);

160

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

The Final Step I

A remaining issue? I I

can you figure out the problem? → multiple outputted results for a single key (custom type)

161

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

The Final Step I

A remaining issue? I I I

can you figure out the problem? → multiple outputted results for a single key (custom type) → no hash function is defined to assign a unique tuple to a unique reducer

161

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

The Final Step I

A remaining issue? I I I

I

can you figure out the problem? → multiple outputted results for a single key (custom type) → no hash function is defined to assign a unique tuple to a unique reducer → define int hashCode(): use hashCode functions of nested types

public int hashCode() { return (new Long(ip*port*time)).hashCode(); }

161

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Outline 1 Introduction 2 Hadoop 3 Installation 4 In practice 5 Design Patterns 6 Applications

Matrix computations Traffic Monitor Topology 7 Tools 162

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Topology tool I

I I

I

Position of a node in a network ∼ importance of the node ∼ role of the node → centrality (various definitions exist) Exercise: for each node, compute the number of neighbors which the maximal distance is k hops Dataset provided by CAIDA: http://www.caida. org/tools/measurement/skitter/router_ topology/itdk0304_rlinks_undirected.gz

163

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Design I

Incremental computation ∼ one Map Reduce per hop I I I

I

first Map: each node informs its neighbors that it exists first Reduce: each node computes its direct neighbors second Map: each node sends its lists of neighbors to all its neighbors second Reduce: each node computes its neighbors at the second hop...

164

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Design I

Incremental computation ∼ one Map Reduce per hop I I I

I

I

first Map: each node informs its neighbors that it exists first Reduce: each node computes its direct neighbors second Map: each node sends its lists of neighbors to all its neighbors second Reduce: each node computes its neighbors at the second hop...

the dataset is not bi-directional → pre-process to have all links in both directions

164

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Design I

Incremental computation ∼ one Map Reduce per hop I I I

I

I

I

first Map: each node informs its neighbors that it exists first Reduce: each node computes its direct neighbors second Map: each node sends its lists of neighbors to all its neighbors second Reduce: each node computes its neighbors at the second hop...

the dataset is not bi-directional → pre-process to have all links in both directions each node has to know its neighbors all along the process → send to itself its own neighbors

164

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

Design I

Incremental computation ∼ one Map Reduce per hop I I I

I

I

I

I

first Map: each node informs its neighbors that it exists first Reduce: each node computes its direct neighbors second Map: each node sends its lists of neighbors to all its neighbors second Reduce: each node computes its neighbors at the second hop...

the dataset is not bi-directional → pre-process to have all links in both directions each node has to know its neighbors all along the process → send to itself its own neighbors the neighbors of the neighbors may not be disjoint → use a set type 164

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

MapReduce

165

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

PageRank I

Global link analysis I I

I

Google web page ranking algorithm a page/host is highly scored if it is well pointed by others especially if these latter have high scores

Iterative computation I I I I

equal score at the begin stop when stable score propagation weighted nodes (bot knowledge) Pt (i) = (1−d)

n X k=1

I

W (k)+d

X Pt−1 (j) Oj

(j,i)∈E

Both communication directions are important → invert arrows → two values per node: hub, authority 166

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Matrix computations Traffic Monitor Topology

PageRank on Map-Reduce I

Node = ID [key] + (score + adjacent nodes) [value] Map Tasks 1

1

2,3,4

2

Mapper 2

0.3

3

1

3

3

Mapper

0.3

4

1

4

Mapper

0.3

3

1

4

4

1

Mapper

1

Shuffle and Sort: aggregate values by keys 2

0.3

3

0.3,1

4

0.3,1

Reducer

Reducer

Reducer

2

3

4

0.3

1.3

1.3

Reduce Tasks

167

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Outline 1 2 3 4 5 6 7

Introduction Hadoop Installation In practice Design Patterns Applications Tools 168

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Overview I

Standard tools → type hadoop in a terminal I I I I I

I

a lot of admin commands for running nodes fs: HDFS manipulation check the classpath run a jar job management (list, kill...)

Many extensions I I I I I I

database like modules: Pig, Hive, HBase, Sqoop workflow design: Oozie mathematical tool: Hama search engines: Lucene user defined type serialization: Thrift end-user interface: Hue 169

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Outline 1 Introduction 2 Hadoop 3 Installation 4 In practice 5 Design Patterns 6 Applications 7 Tools

Pig Hue

170

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Motivation I

Hadoop / Map Reduce I I I

I

analysis of large datasets algorithms have to be reprogrammed but also redesigned need of a well thought design for performance issue: join, sort...

→ can we avoid these drawbacks ? I I I

simple design program similar as usual close to standard dataset analysis ∼ database / procedural / SQL

171

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Pig description I I I

http://pig.apache.org/ High level language Pig translates this language into Hadoop job definition including optimizations Local host

Pig Latin traffic = LOAD 'network_traffic' AS (src_ip, dst_ip, src_port, dst_port, pkts); bySrc = GROUP traffic by src_ip; pkts = FOREACH bySrc GENERATE group, sum(traffic.pkts); DUMP pkts;

Pig Engine read check prepare Job optimize

launch monitor

172

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Pig description I

Optimizations I I I

I

focus the design on what you want to do avoid issues of previous examples about join filters at the begin, numerical optimizations

Is Pig really suited for production use or just for beginners ? I I

30-40% Hadoop yahoo jobs are processed via Pig Pig is also fine tunable

173

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Run Pig I

Installation I

I

I

download, unpacking, configure (hadoop namenode, jobtracker) Cloudera includes Pig: apt-get install hadoop-pig

Run: pig command I

I

I

pig -help for knowing all options: remove optimizations, debugging, logging two execution modes: local (pig -x local, useful for testing/debugging) or hadoop (default, pig -x mapreduce) Grunt shell (without arguments) or script execution (with an argument) or java embedded (PigServer) 174

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Grunt Shell / Scripts I

Grunt Shell I I

Useful for small tests type help for first help: I I

I I

I

hdfs commands job execution: exec, run (access to grunt aliases ∼ variable), kill configuration: set tools for analyzing underlying data and processes: describe, explain

Scripts I I I I I

system in production standard extension .pig (same syntax as shell) same commands as shell comments: line starts with --, between /* and */ parameters: -param myParam=10 → $myParam evaluated as 10 in the script 175

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Load Data I

I

full documentation: Pig Latin reference manual 1 and 2 LOAD ’data’ [USING function] [AS schema]; I I I

I I

’data’: data file function: parsing function schema: data structure

Alias ∼ variable Extract one column of data: GENERATE field [AS name:type] → generate data from other data, functions... 176

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Load Data id, src_ip, dst_ip, #packets, start_time,... $0 $1 $2 $3 $4 aliases (must start by a letter)

Default function with the separator as paramter default separator: \t

Unamed field

em

a

flows = LOAD 'flows100000.csv' USING PigStorage(','); dst_ips = FOREACH flows GENERATE $2;

flows = LOAD 'flows100000.csv' USING PigStorage(',') AS(ind,src_ip,dst_ip,pkts,start_time,start_msec,end_time, end_msec,src_port,dst_port,tcp_flags,proto); dst_ips = FOREACH flows GENERATE dst_ip;

Sc h

lines end by ;

Named field ($2 is still valid)

flows = LOAD 'flows100000.csv' USING PigStorage(',') AS(ind,src_ip,dst_ip:long,pkts,start_time,start_msec,end_time, end_msec,src_port,dst_port,tcp_flags,proto);

Type definition

dst_ips = FOREACH flows GENERATE dst_ip;

177

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Display / Store Data I

Display: DUMP alias Output in a file: STORE alias INTO ’directory’ [USING function];

I

Result sampling

I

I

I I

writing function: like parsing/reading (PigStorage) first n entries: alias = LIMIT alias n; random sample: SAMPLE alias ratio size;

flows = LOAD ’flows100000.csv’ USING PigStorage(’,’); dst_ips = FOREACH flows GENERATE $2; first = LIMIT dst_ips 10; DUMP first; I

Pig vs. SQL I

I

Pig statements are not executed until STORE/DUMP commands = procedural Pig is more flexible: schema may not be defined, no need to define, create table and populate table 178

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Types I

I

scalar: int, long, float, double, chararray (string), byte array (default) complex: I

I

I

I

tuple: ordered set containing various fields (’alert’,0.95,12) bag: unordered collection of tuples, each tuple may have different schemas (different number of fields or types of fields) { (10,5),(’alert’,0.95,12) } = { (’alert’,0.95,12),(10,5) } map: key-value pairs where each key is a string and unique [traffic#normal,duration#450] dereference operator for accessing nested data: x.field (bag/tuple), x.key (for map) 179

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Aggregation I

2 operations (required) I

grouping: I I

I

alias = GROUP aliasToGroup ALL | BY expression create a default schema with two fields: group (bytearray) + bag representing entries from the alias to group

aggregation I

I

use GENERATE + an aggregating function on the bag (COUNT, MAX, MIN, AVG, SUM,...) based on the GROUP relation or create a similar schema (when loading)

flows = LOAD 'flows100000.csv' USING PigStorage(',') AS (ind,src_ip,dst_ip,pkts,start_time,start_msec,end_time, end_msec,src_port,dst_port,tcp_flags,proto); src_ip = GROUP flows BY src_ip;

Grouping

countFlows = FOREACH src_ip GENERATE group, COUNT(flows);

group

flows

@IP1,{(ind,src_ipA,dst_ipA,...), (ind,src_ipB,dst_ipB,...), (ind,src_ipC,dst_ipC,...)} @IP2,{(ind,src_ipD,dst_ipD,...)}

Aggregation

180

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Analysis

I

DESCRIBE alias: get the schema of an alias grunt> DESCRIBE flows; flows: {ind: bytearray,src_ip: bytearray,dst_ip: bytearray,... grunt> DESCRIBE src_ip; src_ip: {group: bytearray,flows: {ind: bytearray,src_ip: bytearray, dst_ip: bytearray... grunt> DESCRIBE countFlows; countFlows: {group: bytearray,long}

I

ILLUSTRATES alias: get sample data to display the step by step process grunt>flows2 = FOREACH flows GENERATE src_ip,dst_ip; ... grunt> ILLUSTRATE countFlows ... -----------------------------------------------------| flows2 | src_ip: bytearray | dst_ip: bytearray | -----------------------------------------------------| | 3752980830 | 2463760020 | | | 3752980830 | 2463760020 | ---------------------------------------------------------------------------------------| src_ip | group: bytearray | flows2: bag({src_ip: bytearray,dst_ip: bytearray}) | ---------------------------------------------------------------------------------------| | 3752980830 | {(3752980830, 2463760020), (3752980830, 2463760020)} | --------------------------------------------| countFlows | group: bytearray | long | --------------------------------------------| | 3752980830 | 2 |

181

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Analysis

I

EXPLAIN alias I I I I I

logical execution plan physical execution plan Map Reduce plan optimization are visible many options: use with a script, dot output...

182

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Filtering I I

I

alias = FILTER aliasToFilter BY expression; Extract data from aliasToFilter which are valid regarding the expression Standard operators and functions: I I

I I I I I

I

numerical operator: + - / × % (modulo) comparisons: == != > < >= =10; src_ip = GROUP flowsFilter BY src_ip; countFlows = FOREACH src_ip GENERATE group, COUNT(flowsFilter); countFlowsOrder = ORDER countFlows BY $1 DESC; STORE countFlowsOrder INTO ’res’;

184

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Data Join

I

COGROUP = GROUP (with several relations) I

create a default schema with two fields: group (bytearray) + bag representing entries from the different aliases to group

attackers countFlows 3709183228 {(3709183228,63)} {(3709183228,1)} ... 1001324120 {(1001324120,1)} {} Outer join

I

Why not one grouping ? 185

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Data Join

I

COGROUP = GROUP (with several relations) I

create a default schema with two fields: group (bytearray) + bag representing entries from the different aliases to group

flows = LOAD ’flows100000.csv’ USING PigStorage(’,’) AS (ind,src_ip,dst_ip,pkts, bytes,start_time,start_msec,end_time,end_msec,src_port,dst_port,tcp_flags,proto); src_ip = GROUP flows BY src_ip; countFlows = FOREACH src_ip GENERATE group, COUNT(flows); attackers = LOAD ’suspects.txt’ USING PigStorage(’,’); log = COGROUP countFlows BY group, attackers by $0; attackers countFlows 3709183228 {(3709183228,63)} {(3709183228,1)} ... 1001324120 {(1001324120,1)} {} Outer join

I

Why not one grouping ? Entries of both relations at the same level: @IP1 = { (flow1),(flow2)..(attIP)} 185

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

I

Data Join

Different join I I I

default: outer join other: inner, left outer, right outer example: count flows from attackers log = GROUP countFlows BY group, attackers by host INNER; countAttacker = FOREACH log GENERATE group, countFlows.$1;

I

JOIN ∼ (CO)GROUP I I I

default: inner join output: a flat set, no bag optimizations: skewed, replicated ∼ DistributedCache

log = JOIN attackers BY host, countFlows BY group USING ’skewed’; countAttacker = FOREACH log GENERATE $0, $3;

GROUP (3709185685,{(62)}) (3709186765,{(122)}) (3709187367,{(85)})

JOIN (3709185685,62) (3709186765,122) (3709187367,85) 186

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Other operators I I

Full list available in documentation! alias = UNION alias1, alias2 [, alias...] I I I

I

I

alias = DISTINCT aliasAll → remove duplicated entries SPLIT aliasAll INTO alias1 IF expression1, [, alias IF expression...] I I

I

merge relations together no need to have the same schema order is not preserved

split a relation into several ones based on conditions each tuple is assigned to 0-n new relations

Common options: PARALLEL n →set the number of reduce tasks 187

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Other operators I

flatten() ∼ invert of grouping I I I

I

remove hierarchical structure of data tuple: (1,(2,3)) → (1,2,3) bag ∼ create cross product: (1,{2,3},{4,5}}) → (1,2,4) (1,2,5) (1,3,4) (1,3,5)

Example: extract flows where suspects hosts are involved as src or dst

188

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Other operators I

flatten() ∼ invert of grouping I I I

I

remove hierarchical structure of data tuple: (1,(2,3)) → (1,2,3) bag ∼ create cross product: (1,{2,3},{4,5}}) → (1,2,4) (1,2,5) (1,3,4) (1,3,5)

Example: extract flows where suspects hosts are involved as src or dst

flows = LOAD ’flows100000.csv’ USING PigStorage(’,’) AS (ind,src_ip,dst_ip,pkts, bytes,start_time,start_msec,end_time,end_msec,src_port,dst_port,tcp_flags,proto); attackers = LOAD ’suspects.txt’ USING PigStorage(’,’) AS (host,one); flowsGrSrc = COGROUP flows BY src_ip, attackers BY host INNER; flowsGrDst = COGROUP flows BY dst_ip, attackers BY host INNER; flowsSrc = FOREACH flowsGrSrc GENERATE group,flatten(flows); flowsDst = FOREACH flowsGrDst GENERATE group,flatten(flows); suspects = UNION flowsSrc, flowsDst; STORE suspects into ’res’ USING PigStorage(’,’);

188

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Outline 1 Introduction 2 Hadoop 3 Installation 4 In practice 5 Design Patterns 6 Applications 7 Tools

Pig Hue

189

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

I

HUE = Hadoop User Experience I I I

Overview

a graphical interface for Hadoop web-based Desktop included in Cloudera Distribution

190

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Desktop I

I I

Access on port 8088 (default): http://localhost:8088/ Standards tools for managing HDFS and jobs Custom tools can be created and integrated within the environment I I I

Django web framework model/view/controller may monitor/interact with any daemon (hadoop or not) Beeswax (HIVE Gui ~ SQL client)

Help

JobBrowser User Admin

Quick status

JobDesigner FS Browser

191

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

FS Browser I

Simple GUI to manage HDFS file (add, delete, move, remove...)

192

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Job Designer I I

Prepare job: .jar or streaming Easy to re-execute previous job For information purpose only

The jar file has to be on HDFS (upload it before)

Arguments of "hadoop jar": class with main + arguments

List of job (right click to modify, execute)

193

Intro

Hadoop

Install

In practice

Design Patterns

Apps

Tools

Pig Hue

Job Browser I I

Log of jobs (running, finished, failed...) (Result) Files can be opened via FileViewer by double-click in FS browser

s

il ta

De

FileViewer

194

Map-Reduce and Hadoop J´erˆome Fran¸cois

[email protected]

Mai 2015