Hadoop, a distributed framework for Big Data Class:

Hadoop, a distributed framework for Big Data Class:

Hadoop, a distributed framework for Big Data Class: CS 237 Distributed Systems Middleware Instructor: Nalini Venkatasubramanian Introduction 1. Introduction: Hadoops history and advantages 2. Architecture in detail 3. Hadoop in industry

What is Hadoop? Apache top level project, open-source implementation of frameworks for reliable, scalable, distributed computing and data storage. It is a flexible and highly-available architecture for large scale computation and data processing on a network of commodity hardware.

Brief History of Hadoop Designed to answer the question: How to process big data with reasonable cost and time? Search engines in 1990s 1996

1996 1996 1997 Google search engines 1998 2013

Hadoops Developers Doug Cutting 2005: Doug Cutting and Michael J. Cafarella developed Hadoop to support distribution for the Nutch search engine project. The project was funded by Yahoo. 2006: Yahoo gave the project to Apache Software Foundation.

Google Origins 2003 2004 2006 Some Hadoop Milestones

2008 - Hadoop Wins Terabyte Sort Benchmark (sorted 1 terabyte of data in 209 seconds, compared to previous record of 297 seconds) 2009 - Avro and Chukwa became new members of Hadoop Framework family

2010 - Hadoop's Hbase, Hive and Pig subprojects completed, adding more computational power to Hadoop framework 2011 - ZooKeeper Completed 2013 - Hadoop 1.1.2 and Hadoop 2.0.3 alpha. - Ambari, Cassandra, Mahout have been added

What is Hadoop? Hadoop: an open-source software framework that supports dataintensive distributed applications, licensed under the Apache v2 license. Goals / Requirements: Abstract and facilitate the storage and processing of large and/or rapidly growing data sets Structured and non-structured data Simple programming models High scalability and availability

Use commodity (cheap!) hardware with little redundancy Fault-tolerance Move computation rather than data Hadoop Framework Tools Hadoops Architecture Distributed, with some centralization Main nodes of cluster are where most of the computational power and storage of the system lies

Main nodes run TaskTracker to accept and reply to MapReduce tasks, and also DataNode to store needed blocks closely as possible Central control node runs NameNode to keep track of HDFS directories & files, and JobTracker to dispatch compute tasks to TaskTracker Written in Java, also supports Python and Ruby Hadoops Architecture Hadoops Architecture

Hadoop Distributed Filesystem Tailored to needs of MapReduce Targeted towards many reads of filestreams Writes are more costly High degree of data replication (3x by default) No need for RAID on normal nodes Large blocksize (64MB) Location awareness of DataNodes in network Hadoops Architecture

NameNode: Stores metadata for the files, like the directory structure of a typical FS. The server holding the NameNode instance is quite crucial, as there is only one. Transaction log for file deletes/adds, etc. Does not use transactions for whole blocks or file-streams, only metadata. Handles creation of more replica blocks when necessary after a DataNode failure

Hadoops Architecture DataNode: Stores the actual data in HDFS Can run on any underlying filesystem (ext3/4, NTFS, etc) Notifies NameNode of what blocks it has NameNode replicates blocks 2x in local rack, 1x elsewhere Hadoops Architecture: MapReduce Engine Hadoops Architecture

MapReduce Engine: JobTracker & TaskTracker JobTracker splits up data into smaller tasks(Map) and sends it to the TaskTracker process in each node TaskTracker reports back to the JobTracker node and reports on job progress, sends data (Reduce) or requests new jobs Hadoops Architecture

None of these components are necessarily limited to using HDFS Many other distributed file-systems with quite different architectures work Many other software packages besides Hadoop's MapReduce platform make use of HDFS Hadoop in the Wild Hadoop is in use at most organizations that handle big data: o Yahoo!

o Facebook o Amazon o Netflix o Etc Some examples of scale: o Yahoo!s Search Webmap runs on 10,000 core Linux cluster and powers Yahoo! Web search o FBs Hadoop cluster hosts 100+ PB of data (July, 2012) & growing at PB/day (Nov, 2012) Hadoop in the Wild

Three main applications of Hadoop: Advertisement (Mining user behavior to generate recommendations) Searches (group related documents) Security (search for uncommon patterns) Hadoop in the Wild Non-realtime large dataset computing: o NY Times was dynamically generating PDFs of articles

from 1851-1922 o Wanted to pre-generate & statically serve articles to improve performance o Using Hadoop + MapReduce running on EC2 / S3, converted 4TB of TIFFs into 11 million PDF articles in 24 hrs Hadoop in the Wild: Facebook Messages Design requirements: o Integrate display of email, SMS and

chat messages between pairs and groups of users o Strong control over who users receive messages from o Suited for production use between 500 million people immediately after launch o Stringent latency & uptime requirements Hadoop in the Wild

System requirements o High write throughput o Cheap, elastic storage o Low latency o High consistency (within a single data center good enough) o Disk-efficient sequential and random read performance Hadoop in the Wild

Classic alternatives o These requirements typically met using large MySQL cluster & caching tiers using Memcached o Content on HDFS could be loaded into MySQL or Memcached if needed by web tier Problems with previous solutions o MySQL has low random write throughput BIG problem for messaging! o Difficult to scale MySQL clusters rapidly while maintaining performance

o MySQL clusters have high management overhead, require more expensive hardware Hadoop in the Wild Facebooks solution o Hadoop + HBase as foundations o Improve & adapt HDFS and HBase to scale to FBs workload and operational considerations Major concern was availability: NameNode is SPOF & failover times are at least 20 minutes Proprietary AvatarNode: eliminates SPOF, makes HDFS

safe to deploy even with 24/7 uptime requirement Performance improvements for realtime workload: RPC timeout. Rather fail fast and try a different DataNode Hadoop Highlights

Distributed File System Fault Tolerance Open Data Format Flexible Schema Queryable Database Why use Hadoop?

Need to process Multi Petabyte Datasets Data may not have strict schema Expensive to build reliability in each application

Nodes fails everyday Need common infrastructure Very Large Distributed File System Assumes Commodity Hardware Optimized for Batch Processing Runs on heterogeneous OS DataNode A Block Sever Stores data in local file system

Stores meta-data of a block - checksum Serves data and meta-data to clients Block Report Periodically sends a report of all existing blocks to NameNode Facilitate Pipelining of Data Forwards data to other specified DataNodes Block Placement

Replication Strategy One replica on local node Second replica on a remote rack Third replica on same remote rack Additional replicas are randomly placed Clients read from nearest replica Data Correctness

Use Checksums to validate data CRC32 File Creation Client computes checksum per 512 byte DataNode stores the checksum File Access Client retrieves the data and checksum from DataNode If validation fails, client tries other replicas Data Pipelining

Client retrieves a list of DataNodes on which to place replicas of a block Client writes block to the first DataNode The first DataNode forwards the data to the next DataNode in the Pipeline When all replicas are written, the client moves on to write the next block in file Hadoop MapReduce

MapReduce programming model Framework for distributed processing of large data sets Pluggable user code runs in generic framework Common design pattern in data processing cat * | grep | sort | uniq -c | cat > file input | map | shuffle | reduce | output

MapReduce Usage Log processing Web search indexing Ad-hoc queries Closer Look MapReduce Component JobClient

JobTracker TaskTracker Child Job Creation/Execution Process MapReduce Process (org.apache.hadoop.mapred) JobClient Submit job

JobTracker Manage and schedule job, split job into tasks TaskTracker Start and monitor the task execution Child The process that really execute the task Inter Process Communication

IPC/RPC (org.apache.hadoop.ipc) Protocol JobSubmissionProtocol JobClient <-------------> JobTracker InterTrackerProtocol TaskTracker <------------> JobTracker TaskUmbilicalProtocol

TaskTracker <-------------> Child JobTracker impliments both protocol and works as server in both IPC TaskTracker implements the TaskUmbilicalProtocol; Child gets task information and reports task status through it. JobClient.submitJob - 1 Check input and output, e.g. check if the output

directory is already existing job.getInputFormat().validateInput(job); job.getOutputFormat().checkOutputSpecs(fs, job); Get InputSplits, sort, and write output to HDFS InputSplit[] splits = job.getInputFormat(). getSplits(job, job.getNumMapTasks()); writeSplitsFile(splits, out); // out is $SYSTEMDIR/$JOBID/job.split JobClient.submitJob - 2

The jar file and configuration file will be uploaded to HDFS system directory job.write(out); // out is $SYSTEMDIR/$JOBID/job.xml JobStatus status = jobSubmitClient.submitJob(jobId); This is an RPC invocation, jobSubmitClient is a proxy created in the initialization

Job initialization on JobTracker - 1 JobTracker.submitJob(jobID) <-- receive RPC invocation request JobInProgress job = new JobInProgress(jobId, this, this.conf) Add the job into Job Queue jobs.put(job.getProfile().getJobId(), job); jobsByPriority.add(job); jobInitQueue.add(job);

Job initialization on JobTracker - 2 Sort by priority resortPriority(); compare the JobPrioity first, then compare the JobSubmissionTime Wake JobInitThread jobInitQueue.notifyall(); job = jobInitQueue.remove(0);

job.initTasks(); JobInProgress - 1 JobInProgress(String jobid, JobTracker jobtracker, JobConf default_conf); JobInProgress.initTasks() DataInputStream splitFile = fs.open(new Path(conf.get(mapred.job.split.file))); // mapred.job.split.file --> $SYSTEMDIR/$JOBID/job.split

JobInProgress - 2 splits = JobClient.readSplitFile(splitFile); numMapTasks = splits.length; maps[i] = new TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf, this, i); reduces[i] = new TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf, this, i); JobStatus --> JobStatus.RUNNING

JobTracker Task Scheduling - 1 Task getNewTaskForTaskTracker(String taskTracker) Compute the maximum tasks that can be running on taskTracker int maxCurrentMap Tasks = tts.getMaxMapTasks(); int maxMapLoad = Math.min(maxCurrentMapTasks, (int)Math.ceil(double)

remainingMapLoad/numTaskTrackers)); JobTracker Task Scheduling - 2 int numMaps = tts.countMapTasks(); // running tasks number If numMaps < maxMapLoad, then more tasks can be allocated, then based on priority, pick the first job from the jobsByPriority Queue, create a task, and return to TaskTracker

Task t = job.obtainNewMapTask(tts, numTaskTrackers); Start TaskTracker - 1 initialize() Remove original local directory RPC initialization TaskReportServer = RPC.getServer(this, bindAddress, tmpPort, max, false, this, fConf); InterTrackerProtocol jobClient =

(InterTrackerProtocol) RPC.waitForProxy(InterTrackerProtocol.class, InterTrackerProtocol.versionID, jobTrackAddr, this.fConf); Start TaskTracker - 2 run(); offerService(); TaskTracker talks to JobTracker with HeartBeat message periodically

HeatbeatResponse heartbeatResponse = transmitHeartBeat(); TaskTracker.localizeJob(TaskInProgress tip); Run Task on TaskTracker - 1 launchTasksForJob(tip, new JobConf(rjob.jobFile)); tip.launchTask(); // TaskTracker.TaskInProgress tip.localizeTask(task); // create folder, symbol link runner = task.createRunner(TaskTracker.this); runner.start(); // start TaskRunner thread

TaskRunner.run(); Run Task on TaskTracker - 2 Configure child process jvm parameters, i.e. classpath, taskid, taskReportServers address & port Start Child Process runChild(wrappedCommand, workDir, taskid); Child.main()

Create RPC Proxy, and execute RPC invocation TaskUmbilicalProtocol umbilical = (TaskUmbilicalProtocol) RPC.getProxy(TaskUmbilicalProtocol.class, TaskUmbilicalProtocol.versionID, address, defaultConf); Task task = umbilical.getTask(taskid); task.run(); // mapTask / reduceTask.run

Finish Job - 1 Child task.done(umilical); RPC call: umbilical.done(taskId, shouldBePromoted) TaskTracker done(taskId, shouldPromote) TaskInProgress tip = tasks.get(taskid);

tip.reportDone(shouldPromote); taskStatus.setRunState(TaskStatus.State.SUCCEEDED) Finish Job - 2 JobTracker TaskStatus report: status.getTaskReports(); TaskInProgress tip = taskidToTIPMap.get(taskId); JobInProgress update JobStatus tip.getJob().updateTaskStatus(tip, report, myMetrics);

One task of current job is finished completedTask(tip, taskStatus, metrics); If (this.status.getRunState() == JobStatus.RUNNING && allDone) {this.status.setRunState(JobStatus.SUCCEEDED)} Demo Word Count hadoop jar hadoop-0.20.2-examples.jar wordcount

Hive hive -f pagerank.hive

Recently Viewed Presentations

  • Engineering Thermodynamics - EECL-COURSE

    Engineering Thermodynamics - EECL-COURSE

    Nusselt number relationship for laminar free convection on hot or cold surface. Boundary layer impacts: laminar turbulent. Buoyancy is the driving force in Natural Convection. Buoyancy. is due to combination of. Differences in fluid density.
  • Copyright  2005 Pearson Education Inc. Marketing: Managing Profitable

    Copyright 2005 Pearson Education Inc. Marketing: Managing Profitable

    Times New Roman Default Design Marketing: Managing Profitable Customer Relationships Learning Objectives Needs, Wants, and Demands Value and Satisfaction Marketing Management Marketing Management Orientations Societal Marketing Concept Customer Satisfaction and Loyalty Today's Marketing Connections Marketing Connections in Transition In ...
  • Next Generation Thinking Deck - wpbcportland.org

    Next Generation Thinking Deck - wpbcportland.org

    2. Since the inception of the 401k in 1980, every service and innovation has been developed to cater to the baby boomer demographic. Advisors have built value propositions based on boomers as well. However, boomers are retiring, millennials are taking...
  • Fundamentals of Electric Circuits Chapter 6

    Fundamentals of Electric Circuits Chapter 6

    If a current is passed through an inductor, the voltage across it is directly proportional to the time rate of change in current. ?=????? Where, L, is the unit of inductance, measured in Henries, H. On Henry is 1 volt-second...
  • Eschatology, a Broad Perspective From Genesis to Revelation

    Eschatology, a Broad Perspective From Genesis to Revelation

    Alexander the Great (vv3-4); swept through Asia Minor, Syria, Egypt, and Mesopotamia (334-331) ... May 29 Hosea and Israel's Relationship with God Clint. Jun 5 IBC Band - No Morning Song. Jun 12 Ezekiel 37, The Valley of Dry Bones...
  • AUTOMATIC VOLTAGE REGULATOR(AVR) - 123seminarsonly.com

    AUTOMATIC VOLTAGE REGULATOR(AVR) - 123seminarsonly.com

    AUTOMATIC VOLTAGE REGULATOR(AVR) Automatic voltage regulator (AVR) maintains the Generator terminal voltage at a given value automatically by changing the excitation current to the Generator field. The AVR supplies the required D.C. to the Generator field depending on the load,...
  • 90 Days of Poetry - Cabarrus County Schools

    90 Days of Poetry - Cabarrus County Schools

    90 Days of Poetry 9/9/16. Read the poem placed on your desk (Introduction to Poetry - Billy Collins) Answer the following analysis questions after you read the poem: What is the meaning of the poem? What sort of imagery is...
  • Categorical Predictors in Regression Prof. Andy Field  Categorical

    Categorical Predictors in Regression Prof. Andy Field Categorical

    Categorical Predictors in Regression. Often you will have categorical predictors that have more than two categories. These groups cannot be distinguished using a single variable coded with zeros and ones.