Apache Hadoop is a software solution for distributed computing of large datasets. Hadoop provides a distributed filesystem (HDFS) and a MapReduce implementation.
Hadoop project organized in 4 projects:
2.5 million lines of code – i.e. 2,500,000 lines. 81 contributors. Roughly, 60% Hortonworks, 30% Cloudera and 10% Yahoo contribution. Original author Doug Cutting currently works at Cloudera.
Designed for scalability and fault tolerance. For 1000 nodes cluster, MTTF is < 1 day. 1000 nodes cluster with each machine 16GB RAM, 1TB disk has total 16TB memory and 1 Petabyte disk. Hadoop designed to process Petabytes of data.
HDFS exposes block placement so that computation can be moved to data.
See http://ercoppa.github.io/HadoopInternals/
---------------------------------------------------------------------
Map Reduce Framework
---------------------------------------------------------------------
YARN HDFS Storage
Infra Manager Federation (S3, Disk, etc)
---------------------------------------------------------------------
Cluster
---------------------------------------------------------------------
The YARN Infrastructure (Yet Another Resource Negotiator) is the framework responsible for providing the computational resources (e.g., CPUs, memory, etc.) needed for application executions. YARN is independent of HDFS – it does not manage storage and is not required by HDFS.
YARN uses the Resource Manager (one per cluster) which is the master and node managers (one per node).
The job is submitted to YARN resource manager (which is single point of failure) which schedules the job using node managers.
The Application Master is responsible for the execution of a single application. It asks for containers to the Resource Scheduler (Resource Manager) and executes specific programs (e.g., the main of a Java class) on the obtained containers.
The Application Master knows the application logic and thus it is framework-specific.
The MapReduce framework provides its own implementation of an Application Master.
--------------------------------------------------------------------------
Job Submitter
--------------------------------------------------------------------------
YARN Resource Manager (uses Resource Scheduler)
--------------------------------------------------------------------------
Node Manager (One per slave)
--------------------------------------------------------------------------
Container (Many per node. Each container represents vcores, memory)
--------------------------------------------------------------------------
Application Master runs on a container to execute part of the Job.
--------------------------------------------------------------------------
HDFS used for input/output but not for intermediate storage by MapReduce.
Map is inherently parallel. Reduce is inherently sequential (with scope for parallelism in some cases).
When a client submits an application, several kinds of information are provided to the YARN infrastucture. In particular:
a configuration: this may be partial (use default values when absent) Notice that these default values may be the ones chosen by a Hadoop provider like Amazon.
A JAR containing:
- a map() implementation
- a combiner implementation
- a reduce() implementation
- Input and output information: input directory: is the input directory on HDFS? On S3? How many files?
- output directory: where will we store the output? On HDFS? On S3?
The number of files inside the input directory is a factor for deciding the number of Map Tasks of a job.
The Application Master will launch one MapTask for each map split. Typically, there is a map split for each input file. If the input file is too big (bigger than the HDFS block size) then we have two or more map splits associated to the same input file.
The MapReduce Application Master asks to the Resource Manager for Containers needed by the Job: one MapTask container request for each MapTask (map split).
A container request for a MapTask tries to exploit data locality of the map split.
The Application Master asks for:
Map output is stored in-memory and may spill over to local disk when there is buffer overflow. If there is a spill, it is done using multiple files one per partition i.e. reducer. The combiner function is run as local reducer. Finally the output file partitions are made available to reducer over HTTP.
Note: There is file system overhead as part of communication from Map task to Reduce task if the output file size is more than 25% NodeManager memory. Otherwise in-memory output is created.
You can configure UBER mode which is MapReduce job for small dataset. An Uber task means that the ApplicationMaster uses its own JVM to run Map and Reduce tasks, so the tasks are executed sequentially on one node. In this case YARN has to allocate a container for ApplicationMaster only.
You can set mapreduce.job.ubertask.enable=true; to enable uberization for small jobs. An example Hive session to run uberized task which has no reducer job at all
set mapreduce.job.ubertask.enable=true;
set mapreduce.map.memory.mb=1000; # Anything which requires less than 1GB can be uberized.
set mapreduce.reduce.memory.mb=1000;
SELECT 'A' FROM dual WHERE 1=1;
Depending on application you may want to have N reducers since you have N computing resources or you may require single reducer if the application is sort of global sort of all output. Anyway there is some default behaviour and you can customize this in application.
The number of ReduceTasks for the job is decided by the configuration parameter mapreduce.job.reduces.
What is the partitionIdx associated to an output tuple?
The paritionIdx of an output tuple is the index of a partition. It is decided inside the Mapper.Context.write():
partitionIdx = (key.hashCode() & Integer.MAX_VALUE) % numReducers
It is stored as metadata in the circular buffer alongside the output tuple. The user can customize the partitioner by setting the configuration parameter mapreduce.job.partitioner.class.
There is single “name node” (single point of failure) and many datanodes.
For search assist, we need auto completion.
Insight: Related concepts appear close together in text corpus
* Input: Web pages
* 1 Billion Pages, 10K bytes each
* 10 TB of input data
* Output: List(word, List(related words))
Think MapReduce:
* Record = (Key, Value)
* Key : Comparable, Serializable * Value: Serializable
* Input, Map, Shuffle, Reduce, Output
Pseudo Code:
# Search Assist // Input: List(URL, Text)
foreach URL in Input :
Words = Tokenize(Text(URL));
foreach word in Tokens :
Insert (word, Next(word, Tokens)) in Pairs;
Insert (word, Previous(word, Tokens)) in Pairs;
// Result: Pairs = List (word, RelatedWord)
Group Pairs by word;
// Result: List (word, List(RelatedWords)
foreach word in Pairs :
Count RelatedWords in GroupedPairs;
// Result: List (word, List(RelatedWords, count))
foreach word in CountedPairs : Sort Pairs(word, *) descending by count;
choose Top 5 Pairs; // Result: List (word, Top5(RelatedWords))
- Install hadoop common using document:
- http://hadoop.apache.org/common/docs/r0.17.1/quickstart.html
By default, Hadoop is configured to run things in a non-distributed mode, as a single Java process. This is useful for debugging.
The following example copies the unpacked conf directory to use as input and then finds and displays every match of the given regular expression. Output is written to the given output directory
$ mkdir input
$ cp conf/*.xml input
$ bin/hadoop jar hadoop-*examples.jar grep input output 'dfs[a-z.]+'
$ cat output/*
1 dfsadmin
Note: It searched all files in input directory and displayed the grep output
Hadoop can also be run on a single-node in a pseudo-distributed mode where each Hadoop daemon runs in a separate Java process.
Hadoop’s Java configuration is driven by two types of important configuration files:
The configuration files (in 2.7.* releases) are:
/opt/hadoop/hadoop/etc $find . -name '*xml*'
./hadoop/capacity-scheduler.xml
./hadoop/kms-acls.xml
./hadoop/yarn-site.xml
./hadoop/hadoop-policy.xml
./hadoop/kms-site.xml
./hadoop/httpfs-site.xml
./hadoop/hdfs-site.xml
./hadoop/core-site.xml
etc/hadoop/slaves file lists all slave hostnames. By default it contains just 'localhost'.
Now check that you can ssh to the localhost without a passphrase:
$ ssh localhost
If you cannot ssh to localhost without a passphrase, execute the following commands:
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
Execution
Format a new distributed-filesystem:
$ bin/hadoop namenode -format
Start The hadoop daemons:
$ bin/start-all.sh
The hadoop daemon log output is written to the ${HADOOP_LOG_DIR} directory
(defaults to ${HADOOP_HOME}/logs).
Browse the web-interface for the NameNode and the JobTracker, by default they
are available at:
NameNode - http://localhost:50070/
JobTracker - http://localhost:50030/
Copy the input files into the distributed filesystem:
$ bin/hadoop dfs -put conf input
Run some of the examples provided:
$ bin/hadoop jar hadoop-*-examples.jar grep input output 'dfs[a-z.]+'
Examine the output files:
Copy the output files from the distributed filesystem to the local filesytem
and examine them:
$ bin/hadoop dfs -get output output
$ cat output/*
or
View the output files on the distributed filesystem:
$ bin/hadoop dfs -cat output/*
When you're done, stop the daemons with:
$ bin/stop-all.sh
Namenode - A daemon which provides info for available nodes and file replica location information. Single point of failure.
Datanode - One or mode nodes where HDFS data resides.
Zoo keeper - For HBase HMaster HA and HDFS 2.0 Resource Manager HA
Application master - Runs on any slave node as Job driver - one per job - MapReduce framework provides an implementation of application master.
Script Name Functionality
bin/hadoop hadoop fs invokes hdfs client (dump or put files). hadoop jar invokes MapReduce job.
For yarn application, use yarn jar instead of hadoop jar. Config files in ../etc dir.
hadoop fs can mix files from hdfs, s3, local, etc file systems where as hdfs dfs is for hdfs only.
See Also:
http://stackoverflow.com/questions/18142960/whats-the-difference-between-hadoop-fs-shell-commands-and-hdfs-dfs-shell-co
bin/hdfs hdfs management script. Supported commands include:
dfs run a filesystem command on the file systems supported in Hadoop.
namenode -format format the DFS filesystem and/or invoke namenode daemon
datanode run a DFS datanode
dfsadmin E.g. hdfs dfsadmin -report ; lists all running name and data nodes.
nfs3 run an NFS version 3 gateway
getconf -namenodes Get list of namenodes in cluster.
etc.
sbin/start-dfs.sh Start namenode daemon and datanode daemon.
sbin/hadoop-daemon.sh Highlevel script to start any daemon like namenode/datanode/etc.
Invokes lower level script like hdfs after initializing proper env variables, etc.
sbin/hadoop-daemons.sh Highlevel script to start any daemon on all slaves as specified in etc/hadoop/slaves file.
bin/yarn Invokes yarn resource manager (master one per cluster) or node manager (one per slave).
sbin/start-yarn.sh Run this on master node to start both resource manager and node manager.
sbin/yarn-daemon.sh Highlevel script to invoke bin/yarn after setting up proper env variables.
sbin/yarn-daemons.sh Highlevel script to invoke bin/yarn on all nodes as configured in etc/hadoop/slaves file.
hadoop@hadoopnode1:~/hadoop-0.20.2/conf$ jps
9923 Jps
7555 NameNode
8133 TaskTracker
7897 SecondaryNameNode
7728 DataNode
7971 JobTracker
See ‘default.xml‘ files in hadoop/ install dir.
NameNode - http://localhost:50070/; For https 50470
DataNode - http://localhost:50075/; For https 50475; For data transfer port 50010
JobTracker - http://localhost:50030/
$ mkdir input
$ cp etc/hadoop/*.xml input
$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar grep input output 'dfs[a-z.]+'
$ cat output/*
mapreduce.framework.name can be one of local, classic or yarn.
An open source, non-relational, NoSQL, column oriented, distributed database modeled after Google’s BigTable and is written in Java Part of Hadoop core. NoSQL. Java. Mutable data on top of immutable HDFS. Phoenix gives SQL JDBC interface to HBASE.
HBASE components:
Can scale to millions of columns. Versioned data by time. Each entry is: ( Table, Row_key, Family, Column, Timestamp) = Cell-Value
A key-range is called a region.
Example
( 'key1', 'Animal-Family', 'Size', 1:00PM ) = 'Small'
( 'key1', 'Animal-Family', 'Size', 2:00PM ) = 'Big'
Example Table Creation
hbase> create 't1', 'f1', 'f2' # Creates table t1 with f1 and f2 as family-name fields.
hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'} # supply some configs.
hbase> put 't1', 'id1', 'f1:city', 'Boston' # Insert column 'f1:city' with ID as 'id1' with value 'Boston'
hbase> list # List tables
hbase> scan 't1' # Dump all
HBase Tables are divided horizontally by row key range into ‘Regions.’ A region contains all rows in the table between the region’s start key and end key. Regions are assigned to the nodes in the cluster, called ‘Region Servers’, and these serve data for reads and writes. A region server can serve about 1,000 regions.
Problems with HBASE: Business continuity reliability:
Hive is just a tool which provides SQL interface to user to manipulate hadoop data in HDFS. It generates MAP/Reduce code from given SQL. It is not a database itself. Part of Hadoop Core. Written in Java. HQL.
Unlike Hive, HBase operations run in real-time on its database rather than MapReduce jobs. So Hive with HBASE is not apple to apple comparison. You can compare Hive with PIG script which also generates MAP/Reduce jobs for Hadoop data.
Hive enables developers not familiar with MapReduce to write data queries that are translated into MapReduce jobs in Hadoop.
Despite providing SQL functionality, Hive does not provide interactive querying yet - it only runs batch processes on Hadoop.
SQL queries are submitted to Hive and they are executed as follows:
Clients communicate with HiveServer2 over a JDBC/ODBC connection. Can have multiple HiverServer2 daemons for HA managed by zookeeper. HiveServer2 has embedded metastore (using derby to cache meta info) apart from external MetastoreDB.
Display all config parameters using set command
hive> set # displays all config parameters
hive> show databases;
hive> use xademo;
hive> show tables;
hive> show create table customer_details;
The following default hive property indicates that the meta-data is stored in MySQL by default:
javax.jdo.option.ConnectionURL=jdbc:mysql://localhost/hive?createDatabaseIfNotExist=true
See Also: Spark SQL which uses Hive for metastore and frontend.
Pig platform includes an execution environment and a scripting language (Pig Latin) used to analyzeHadoop data sets. Its compiler translates Pig Latin into sequences of MapReduce programs.
Sometimes it is easier to write PIG script compared to Hive SQL (when job involves some sequential steps). Sometimes it is easier to write SQL than PIG script. Example:
select * from X order by Y limit 10
There is no way you can efficiently execute above query using PIG script without sorting the entire data!!! (sometimes it is impossible to do that due to this being inefficient. A hand coded Map/Reduce can do the optimization well. An Apache-Sparc program also can do this efficiently ???
NOTE: MapReduce jobs can store into HBASE, but HBASE does not trigger MapReduce Jobs.
Apache Ambari is a web-based tool for provisioning, managing, and monitoring Apache Hadoop clusters. Ambari leverages open source tools such as Ganglia for metrics collection and Nagios for system alerting (e.g. sending emails when a node goes down, remaining disk space is low, etc). Use puppet for installation.
Supports blueprints as templates.
Hadoop User Interface Tool was initial Hadoop UI based cluster management tool from Cloudera. Written on Python/Django. Now a days Hortonwork’s Ambari works well and overall better. Some features like HBase browser are there in Hue only.
Configuration repository – used by cluster managers. Nodes are accessed with file-system like paths. Each node has associated data limited to max 1 MB.
Single point of failure is prevented by having the client also the list of zookeeper servers. i.e. the zookeeper url is like srv1,srv2,srv3. So the failover is automatically handled by client itself! As long as a majority of the servers are available, the ZooKeeper service will be available.
Read requests processed locally. Write requests are forwarded to other ZooKeeper servers and go through consensus. Thus, the throughput of read requests scales with the number of servers and the throughput of write requests decreases with the number of servers.
Projects using zookeeper: * HDFS 2.0 name node (i.e. Resource Manager) uses zookeeper for HA. The first HDFS version 1.0 did not use zookeeper. * HBASE Master Server Daemon uses zookeeper which is co-located along with HDFS Namenode. * Mesos - Cluster management tool uses zoo keeper to detect partitioning of nodes. * YARN - MapReduce 2.0 Resource scheduler uses zoo keeper to detect partitioning and config sharing *
For distributed cron job locking, you can use zookeeper: * http://zookeeper.apache.org/doc/r3.2.2/recipes.html * https://aws.amazon.com/blogs/compute/scheduling-ssh-jobs-using-aws-lambda/ * Chronos: Fault tolerant job scheduler for Mesos - https://mesos.github.io/chronos/
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data.
Typically to transfer file to HDFS, you do:
$ hadoop fs -put /path of the required file /path in HDFS where to save the file
But above command does not stream continuos data.
But flume enables streaming of log data into HDFS.
Scribe is a log collection framework over Thrift (language independent protocol definition framework), built and open sourced by Facebook. Another alternative to Flume to stream log data to HDFS. Flume is more popular. Note: Even Kafka can be used to stream data into Hadoop as an alternative.
An open source distributed database management system and is a columner database. At twitter, they use HBASE for OLAP and Cassandra for OLTP. Written in Java inspired by Amazon Dynamo. It provides eventual consistency. Twitter migrated from MySQL to Casssandra.
It is not as performant as Redis/Memcache (written in C) which is key/value based database. Cassandra provides HiveQL interface to access data but Redis does not. If data is inherently Key/Value or requires high consistency then Redis is better.
Querying by key, or key range (secondary indices are also available). You can write triggers in Java.
Apache Spark, Cassandra, Kafka is latest industry standard stack.
An open source distributed, fault-tolerant graph database for managing data at webscale.
Thrift is an interface definition language and binary communication protocol[1] that is used to define and create services for numerous languages. Thrift includes a complete stack for creating clients and servers.
Apache Sqoop(TM) is a tool to transfer data between Hadoop and RDBMS (and other structured databases).
Apache Drill is an open source, low-latency query engine for big data:
SQL interface to HBASE. Originally developed by Cloudera.
Apache Impala is the open source, native analytic database for Apache Hadoop. Impala is shipped by Cloudera, MapR, Oracle, and Amazon.
Provies HiveQL compatible query language. Faster querying of HDFS data without typical Map/Reduce overhead that comes with HiveSQL. Note: Unlike Hive, it does not convert the query into MR job.
Another Impala alternative (incubating) ??
A toolkit installed with cluster for management of services. Used with Apache Storm mainly.
Knox is a system which provides single point of authentication for hadoop services.
See Also: http://www.slideshare.net/ToddPalino/kafka-at-scale-multitier-architectures
Partitions : A partition, or more specifically topic partition, is the unit of parallelism in Kafka.
Replicas : In Kafka, replication is implemented at the partition level.
Brokers : A typical Kafka installation (or cluster) consists of one or more servers,
hosting partitions belonging to different topics.
Zookeeper serves as the metadata store for Kafka cluster. Zookeeper is a required component.
Simple highly available distributed configuration manager. Zookeeper clients url looks like: zk://host1:port1,host2:port2,host3:port3 So clients will connect to any one which is available. Zookeeper servers are in odd number to elect quorum leader: e.g. 3, 5, 7, etc.
It maintains a file system like name space where each node is associated with some values. Clients can subscribe to watch specific nodes to get notified. Can use heartbeats to keep track of live nodes associated with nodes in zk tree.
Other controllers/servers can make use of zookeeper to elect leaders - Zookeeper itself does not elect leader for arbitrary external cluster of servers. Multiple clusters of servers can use single Zookeeper installation to elect their group leaders.
Use Cases:
Hosted message service – Kafka alternative.
Apache Storm is a distributed computation framework written predominantly in the Clojure programming language. It is apache spark alternative.
Akka is a toolkit and runtime for building highly concurrent, distributed, and resilient message-driven applications on the JVM. Apache/spark was written in Scala which makes use of Akka for it’s implementation.
In a Storm topology, a Spout is the source of data streams (i.e. Kafka) and a Bolt holds the business logic (i.e. Storm) for analyzing and processing those streams. i.e. Kafka acts more like data stream router and Storm is more like processor.
See: http://hortonworks.com/blog/storm-kafka-together-real-time-data-refinery/
Also See: Storm Blueprints by Taylor Goetz
See http://www.slideshare.net/ptgoetz/apache-storm-vs-spark-streaming
One key difference between Spark vs STORM is that Spark performs Data-Parallel computations while Storm performs Task-Parallel computations. Spark streaming supports micro-batching where as storm is record-at-a-time. Should compare with Storm-Trident which supports micro batching.
Documented oriented, NoSQL database written in C++. Uses Sharding (horizontal partitioning across server instances). Uses BSON. Highly scalable, Document oriented. Master/Slave replication, auto failover. Can configure N replica sets.
Pluggable storage engine. By default uses mmap to file as storage engine.
Terminologies
-------------------------------------------
RDBMS MongoDB
-------------------------------------------
Table Collection
Row Document
Join Embedding and Linking
Can run arbitrary javascript functions server side. Easy to add columns.
Distributed in-memory key/value datastructure store written in C. High performance. It supports data structures such as strings, hashes, lists, sets, etc. Master-slave replication, automatic failover.
It is used as database cache and message broker. It provides publish/subscribe messaging features as well, but may not be as sophisticated as other dedicated message brokers (like RabbitMQ, Kafka, etc).
Solr using Apache Lucene -
Written in Erlang. Document Oriented. Supports WebAPI.
A key/value store database
A key/value store database from LinkedIn. Big hash table like. Out of CAP theorem, it is AP system (sacrificing consistency, if need be).
AWS Lambda is an event-driven, serverless computing platform provided by Amazon. The purpose of Lambda, as compared to AWS EC2, is to simplify building smaller, on-demand applications that are responsive to events and new information. AWS targets starting a Lambda instance within milliseconds of an event. Node.js, Python and Java are all officially supported as of 2016, and other languages can be supported via call-outs.
AWS Lambda offers the perfect middle ground between IaaS and PaaS. It also effectively counters the growing threat of containers to its business by simplifying the task of running code in the cloud. It is Amazon’s way of delivering a microservices framework far ahead of its competitors.
Currently, there are three ways of running code in AWS cloud: Amazon EC2, Amazon ECS, and AWS Elastic Beanstalk. EC2 is a full-blown IaaS while ECS is the hosted container environment. Finally, Elastic Beanstalk is a PaaS layer. AWS Lambda forms the fourth service with the capability to execute code in the cloud. But it is unique in a sense that it is at the intersection of EC2, ECS, and Elastic Beanstalk.
A container instance can be any EC2 instance, running any distro (Amazon Linux, Ubuntu, CoreOS)
It just needs two extra software components:
the Docker daemon, the AWS ECS agent. The ECS agent is open source (Apache license). You can check the ECS agent repo on github.
There is no additional cost: you pay only for the EC2 resources; it only works on VPC
Your containers will run on your EC2 instances (a bit like for Elastic Beanstalk, if you are familiar with that)
A task is an instanciation of a task definition. In other words, that will be a group of related, running containers.
See:
Software level container using docker engine to isolate applications from one another. They share same OS but have separate /bin and /lib and application binaries, completely isolated. Each container can have independent TCP ports binding to same tcp port number without conflict. You can choose to do port forwarding of specific ports from container to host.
Lighter than virtual machines.
Developers can get going quickly by starting with one of the 13,000+ apps available on Docker Hub.
Docker originally used LinuX Containers (LXC), but later switched to runC (formerly known as libcontainer), which runs in the same operating system as its host. This allows it to share a lot of the host operating system resources. Also, it uses a layered filesystem (AuFS) and manages networking. Inspired by FreeBSD Jails.
AuFS is a layered file system, so you can have a read only part and a write part which are merged together. One could have the common parts of the operating system as read only (and shared amongst all of your containers) and then give each container its own mount for writing. So you could easily run thousands of containers on a host.
Some commands:
apt-get install docker.io
# Use debian as the container OS just for kicks (it's also smaller than ubuntu)
docker run -i -t debian /bin/bash
# will present you after some downloading with the following root shell in the container. Yay.
root@482791e6cc1a:
# now uname still gives ubuntu, because this is a container and not a true VM..
docker pull ubuntu:14.10 # Remember to specify version; otherwise it will download all versions!!!
service docker.io restart # Restarts docker engine.
# docker run creates a container, configures it and runs a command in it. Every run creates a new container.
# So a lot of containers will be generated. Observe them with docker ps -a.
# You will want to periodically clean up all the cruft with docker rm.
# When you need to reconfigure a container, you need to "commit" it. That creates an image out of the container,
# that can then be reconfigured with a new run command. There is no other way.
# For a container, especially with networking configured, expect to do a lot of commits and runs.
In general all data or logs in the container, are transient. To persist certain mount points, do the following:
# Map data to persist in, via -v
root@host:~# mkdir /usr/local/test
root@host:~# docker run -i -t -v /usr/local/test:/tmp/test debian /bin/bash
root@ec08690b183a:/# mount
/dev/disk/by-uuid/77fb49ed-b208-4709-9a68-9cd6e6b3d7f4 on /tmp/test type ext4 (rw,relatime,errors=remount-ro,data=ordered)
root@ec08690b183a:/# touch /tmp/test/FROM_CONTAINER
root@ec08690b183a:/# ^P^Q # Come out of shell, but run docker shell in background.
root@ubuntu-14:~# ls /usr/local/test/
The container stops, when the shell stops
Use CTRL-P CTRL-Q to step out of the container shell and keep the shell running. Reattach with docker attach.
docker run -i -t -p 127.0.0.1:8082:80 debian /bin/bash
root@26e40f043fbb: apt-get update
...
root@26e40f043fbb: apt-get install apache2
root@26e40f043fbb: apachectl start
root@26e40f043fbb: ^P^Q
root@26e40f043fbb:/#
root@ubuntu-14:~# telnet 127.0.0.1 8082
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'
GET /
...
root@ubuntu-14:~# docker attach 26e40f043fbb
root@26e40f043fbb:
The container is a different container, even if run with the same parameters again
root@host: docker run -i -t -p 127.0.0.1:8081:80 debian /bin/bash
root@9f716f2329eb:/# apachectl start
bash: apachectl: command not found
The container's previous processes are gone after start
root@host:~# docker ps -a
root@host:~# docker start 26e40f043fbb
# Getting rid of unused images
docker rmi $(docker images --filter dangling=true --quiet)
See Also: https://www.ctl.io/developers/blog/post/docker-networking-rules/
Kubernetes just reached 1.0, has ~9k+ GH stars, and 400+ contributors (as of this posting). It also runs everywhere (AWS, GCP, Azure, on Mesosphere, bare metal), can be hosted (on GKE) and has significant third party contributors (CoreOS, RedHat).
You can get started running on Kubernetes in a single command:
kubectl run nginx --image=nginx --replicas=5
Kubernetes uses flannel, has its own load balancer,uses etcd, uses a CLI, API and configurations that are not the same as Docker engine. So it comes at a cost, especially that it is a pain to install and configure. You will also spend more time learning about it, since its philosophy is different from Docker. But once everything is set up, Kubernetes is a very good technology with lots of features.
Kubernetes is:
Docker Swarm, is easy to install and use and now it is built-in Docker. It has its own service discovery and same CLI as Docker and it can be integrated easily with Docker in production environment.
Mesos is proven to scale and effectively powers Twitter’s entire infrastructure. Marathon is a framework for orchestration, that can also handle docker.
Another option is to go with Kubernetes which can run as a framework on top of Mesos.
To host webserver behind NAT, check http://portforward.com to see tips on configuring your specific router.
See Also: * http://superuser.com/questions/121435/is-it-possible-to-host-a-web-server-from-behind-a-nat * http://stackoverflow.com/questions/26539727/giving-a-docker-container-a-routable-ip-address
Out of box platform:
Triton, Racher, and some close sourced: Docker UCP, Tectonic, DCOS, ContainerX
-------------------------------------------------------------------------------------------
Clustering & App Orchestration:
Mesos/Marathon, Docker Swarm, Google Kubernetes, Nomad, Aurora, ECS
-------------------------------------------------------------------------------------------
Service Discovery: etcd, consul, zookeeper
-------------------------------------------------------------------------------------------
Storage Mgmt: | Networking: | Exec: | Image: | Repo Server:
Flocker, Portworx | CNM, CNI | Docker, rkt, | ACI, | Docker Hub,
| Flannel, etc| mesos | Docker Image | CoreOS Quay
-------------------------------------------------------------------------------------------
Host OS : Ubuntu, Debian, CoreOS, etc.
-------------------------------------------------------------------------------------------
Cloud Resource:AWS, Azure, GCE, OpenStack, etc. | Infra Orchestrate:
| Terraform, Heat
-------------------------------------------------------------------------------------------
Some framework or software providers on big data ...
Oldest hadoop support company. Cloudera is just as good as Hortonworks but some of it components like Cloudera Manager are not open source.
More modern and 100% opensource hadoop support company.
MapR is another hadoop/bigdata support provider and also provides high quality free trainings.
Google data processing tools include Google Cloud Pub/Sub, Google Cloud Dataflow, Google BigQuery, and Google Cloud Dataproc. They are usually more easier to use and less number of choices (so less confusing) of tools for specific use case. Spotify (online music company) is moving to google platform.
AWS has “three” queuing systems, “two” storage solutions with different API’s and different quirks. Google just has one and its nails the use cases for queuing and storage. This is not currently true. Google has: Datastore, Cloud SQL, Bigtable, BigQuery and Cloud Storage [1]. Each is intended for a different use case, as are Amazon’s offerings.
This is the default TCP ports used by standard HDP Sandbox distribution
Service URL
Sandbox Welcome Page http://host:8888
Ambari Dashboard http://host:8080
Ambari Welcome http://host:8080/views/ADMIN_VIEW/2.2.1.0/INSTANCE/#/
Hive User View http://host:8080/#/main/views/HIVE/1.0.0/AUTO_HIVE_INSTANCE
Pig User View http://host:8080/#/main/views/PIG/1.0.0/Pig
File User View http://host:8080/#/main/views/FILES/1.0.0/Files
SSH Web Client http://host:4200
Hadoop Configuration http://host:50070/dfshealth.html http://host:50070/explorer.html
THE FOLLOWING TABLE CONTAINS LOGIN CREDENTIALS:
Service User Password
Ambari maria_dev maria_dev
Ambari admin admin thavaadmin
Linux OS root hadoop/thavamuni
EC2 - Running machine on cloud. IaaS - Infrastructure as Service. Google Compute Engine is google alternative.
EMR - Elastic Map Reduce - Amazon EMR is a managed cluster platform for running big data frameworks, such as Apache Hadoop and Apache Spark.
ECS - EC2 Container service aka Docker on AWS
* Your containers will run on your EC2 instances (a bit like for Elastic Beanstalk, if you are familiar with that)
* there is no additional cost: you pay only for the EC2 resources
* it only works on VPC ?? there is no console dashboard yet; you have to use the CLI or API
* for now, you can only start containers from public images hosted on the Docker Hub
* Allows you to easily run applications on a managed cluster of Amazon EC2 instances.
* EC2 instance (container instance) should run docker daemon and AWS agent.
Elastic Beanstack.
AWS Elastic Beanstalk is a PaaS (Platform as a Service). Google alternative is Google App Engine.
Users to create applications and deploy on AWS Beanstack.
Application can use AWS services, including EC2, S3, SNS, CloudWatch, autoscaling, and Elastic Load Balancers, etc.
There is no additional charge for Elastic Beanstalk. You pay only for the underlying AWS resources.
Different container types (stacks) are supported. e.g. Apache Tomcat/Linux/Apache webserver; etc.
Can use predefined docker images as containers.
Lambda - AWS Lambda is a compute service. You provide the code (Java, Python, etc) to run. You can’t directly access/configure the EC2 that it is running on. Provides auto management of infrastructure. If you need to control the EC2, then use Elastic Beanstack. Typically can be invoked from AWS SDK API calls made from HTTP request handler or event handler for changes to S3, DynamoDB etc. Use the built-in CloudWatch monitoring of your Lambda functions to view and optimize request latencies.
S3 - Scalable Storage in the Cloud.
Cloudfront - It is a content delivery network (CDN) to cache web media using global proxy servers. Video Streaming. Site acceleration.
Amazon Elastic File System (EFS) provides scalable file storage for use with Amazon EC2.
Amazon Glacier is an extremely low-cost storage service.
Amazon RDS - Provides RDBMS as service. Can choose MySQL/Oracle/Postgres/Amazon Aurora. Competitors include Google Cloud SQL, Rackspace Cloud Databases, Azure SQL, HP Cloud, ClearDB, etc.
Amazon DynamoDB is a fully managed NoSQL database service. It was originally intended as high available key-value store, but now supports multiple columns, secondary indexes, etc. Can be compared to Google Bigtable and Hadoop HBASE. Does synchronous replication.
Redshift - Amazon Redshift is postgres compatible RDBMS for datawarehousing and analytics.
ElastiCache is a Amazon web service for in-memory cache in cloud. aka Redis or memcached in Amazon.
CloudWatch: Monitor Resources and Applications
CloudFormation: Create and Manage Resources with Templates
OpsWorks Automate Operations with Chef
Trusted Advisor Optimize Performance and Security
SQS - Simple Message Queue Service
messages to other distributed services such as SQS, AWS Lambda or HTTP endpoints.
Kinesis - Realtime data streaming
Typical Architecture:
------ +-----------> Spark-Streaming ---------------->+----------------+
Kafka >===| | Cassandra DB |===> Realtime/Batch View
------ +-----------> HDFS --> Spark-Processing ------>+----------------+
See http://spark.apache.org/docs/latest/programming-guide.html#overview
Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName(appName).setMaster(master) # Use appName="local", master="local" for local testing.
#
# Possible values of master parameter:
# local - Use local single worker
# local[5] - Set local 5 worker thereads.
# local[*] - Set as many workers as local cores.
# spark://HOST:PORT - connect to a Spark standalone cluster;
# mesos://HOST:PORT - connect to a Mesos cluster;
#
sc = SparkContext(conf=conf)
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
# distData = sc.parallelize(data, 10) # we say: 10 partitions.
# distData.reduce(lambda a, b: a + b) # Computes sum of all elements.
# lines = sc.textFile("data.txt") # Default parallelization parameter is newline delimiter.
# # This is RDD, but just a pointer, not loaded into memory yet.
lineLengths = lines.map(lambda s: len(s)) # Lazy evaluation. This is again just specification. No action.
totalLength = lineLengths.reduce(lambda a, b: a + b) # This triggers action.
# Note that some operations,such as join,are only available on RDDs of key-value pairs.
Serializing/unserializing a collection of key, value pairs could not be easier:
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).
Caveats about closure:
counter = 0
rdd = sc.parallelize(data) # say data = [ 10, 20, 30 ]
# Wrong: Don't do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter) # increment_counter(10); increment_counter(20); etc invoked.
print("Counter value: ", counter) # Global variable modified. Results undefined when executed under cluster.
Solution: Don't use globals or use Accumulator pattern.
Also See: http://spark.apache.org/examples.html
Some examples from pyspark shell
# Search through error msgs in log files.
textFile = sc.textFile("hdfs://...")
# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r: Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()
# Simple data operations
url = \
"jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable", "people") \
.load()
# Looks the schema of this DataFrame.
df.printSchema()
# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()
# Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
However, you can go from a DataFrame to an RDD via its rdd method, and you can go from an RDD to a DataFrame (if the RDD is in a tabular format) via the toDF method
from pyspark.sql.types import BooleanType
less_ten = udf(lambda s: s < 10, BooleanType())
# Initialize data as a list of some arbitrary tuples.
dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age') )
lambdaDF = dataDF.filter(less_ten(dataDF.age))
# Note: dataDF.age is Column type, not an integer constant.
lambdaDF.show()
lambdaDF.count()
Spark cluster can be of any one of these types:
Using Sparc Standalone cluster manager means using the scripts to start master and slaves manually. When the slaves are started they connect to master using startup options – that is how the masters know the slaves exist:
./sbin/start-master.sh
Once started, the master will print out a spark://HOST:PORT URL for itself, which you can use to connect workers to it, or pass as the master argument to SparkContext. You can also find this URL on the master’s web UI, which is http://localhost:8080 by default.
Similarly, you can start one or more workers and connect them to the master via:
./sbin/start-slave.sh <master-spark-URL>
Once you have started a worker, look at the master’s web UI (http://localhost:8080 by default). You should see the new node listed there, along with its number of CPUs and memory (minus one gigabyte left for the OS).
To run an application on the Spark cluster, simply pass the spark://IP:PORT URL of the master as to the SparkContext constructor.
To run an interactive Spark shell against the cluster, run the following command:
./bin/spark-shell --master spark://IP:PORT
You can also pass an option –total-executor-cores <numCores> to control the number of cores that spark-shell uses on the cluster.
For running with MESOS, See http://spark.apache.org/docs/latest/running-on-mesos.html
To submit job to spark cluster, use spark-submit script:
# Run application locally on localhost on 8 cores
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[8] /path/to/examples.jar 100
# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://207.184.161.138:7077
--executor-memory 20G --total-executor-cores 100 /path/to/examples.jar 1000
See http://spark.apache.org/docs/latest/submitting-applications.html
If you are connecting to mysql from spark-shell, do this:
# spark-shell --jars mysql-connector.jar
http://spark.apache.org/docs/latest/cluster-overview.html
See original white paper: https://www2.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf
Uses micro-batching where data from t0-t1 is constructed as a DStream and processed.
Easier monitoring and interactive queries
Spark Streaming can read data from HDFS, Flume, Kafka, Twitter and ZeroMQ. You can also define your own custom data sources.
Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window.
Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
See https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html for better fault tolerance with spark streaming using Write-ahead-log.
Spark driver runs receivers as threads and divides input into blocks in memory. These in-memory blocks are also replicated (to other machines or same process ??). Every batch interval the driver, launches tasks to process the blocks.
How Stream code is different from regular processing ?
# API is almost same except the initialization of input stream.
# spark-shell --driver-memory 1G
scala >
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.socketTextStream("localhost",8585,MEMORY_ONLY)
val wordsFlatMap = lines.flatMap(_.split(" "))
....
....
# Compare this with
lines = sc.textFile("data.txt") # Default parallelization parameter is newline delimiter.
OR
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
# In regular processing the size of input is known unlike in streaming.
Following things are pre-imported:
Note: databricks uses dbfs - Databricks File system which is not opensource. Similar to HDFS. Do not bother to dig much into this.
To execute external shell commands do
scala> import sys.process._
scala> "ls -l" !
... output ...
result: Int = 0
scala> val result = "ls -al" ! # Stores 0 to result.
scala> val result = "ls -al" !! # Store output string into result.
result: String = " .... output ... "
val result = "ps auxw" #| "grep http" #| "wc -l" ! # #! is scala's pipeline operator.
# Other pipeline operators supported by scala shell: #< #> #>> #&& #!!
This is a scala shell with spark libraries already loadedd.
Use spark-shell (which is a scala shell) only for incremental development/testing. For running application, package application as .jar file and use spark-submit for deployment.
https://www.youtube.com/watch?v=WyfHUNnMutg
Also See: https://github.com/hadooparchitecturebook/hadoop-arch-book
No spark shuffle block can be greater than 2 GB !!! Solution: increase the number of partitions and decrease the skew, randomize the keyname.
Common Mistakes and Solutions:
In Spark SQL, Increase spark.sql.shuffle.partitions
In regular spark application, use rdd.repartition() or rdd.coalesce()
Approx 128MB per partition works.
Shuffles are to be avoided
Prefer RduceByKey over GroupByKey
Avoid Cartesians
Examine your Spark DAG execution plan.
Use TreeReduce instead of Reduce.
Use Complex Types for Reduce Object – not limited to just tuple. You can use hash map, set, whatever.
While MR has just two steps (map and reduce), DAG can have multiple levels that can form a tree structure. Say if you want to execute a SQL query, DAG is more flexible with more functions like map, filter, union etc. Also DAG execution is faster as in case of Apache Tez that succeeds MR due to intermediate results not being written to disk.
Users can control two other aspects of RDDs: persistence and partitioning. Users can indicate which RDDs they will reuse and choose a storage strategy for them (e.g., in-memory storage). They can also ask that an RDD’s elements be partitioned across machines based on a key in each record. This is useful for placement optimizations, such as ensuring that two datasets that will be joined together are hash-partitioned in the same way.
Spark exposes RDDs through a language-integrated API similar to DryadLINQ[31] and FlumeJava [8], where each dataset is represented as an object and transformations are invoked using methods on these objects.
Common Pattern: Back unreliable data with Kafka for 100% durability. (Kafka has built-in data backup capability ???)
In Python you can simply try to filter globals by type:
def list_rdds():
from pyspark import RDD
return [k for (k, v) in globals().items() if isinstance(v, RDD)]
list_rdds()
# []
rdd = sc.parallelize([])
list_rdds()
# ['rdd']
In Scala REPL you should be able to use $intp.definedTerms / $intp.typeOfTerm in a similar way.
If you want to view the content of a RDD, one way is to use collect():
myRDD.collect().foreach(println)
That’s not a good idea, though, when the RDD has billions of lines. Use take() to take just a few to print out:
myRDD.take(n).foreach(println)
You should pick any two of Consistency, Availability and Partition Tolerance:
In big data NoSQL environment, consistency guarantee is very expensive since few node failures and partitioning is quite common. Note that single node hardware/OS/software failure is also equivalent to partitioning. Hence AP systems are popular compromising on Consistency by providing ‘Eventual Consistency’.
Zookeeper favours consistency and disables the smaller partition from operating. (Intelligent partition handling, but technically not fully partition tolerant!!!).
HDFS also chooses consistency. Writes are more expensive than reads. Decent availability till 3 datanodes failure. In case of partitioning, nodes available from namenode side survive, others rendered useless. Partial partition tolerance.
All RDBMS systems.
Gives up availability to be consistent but usually works well when there are few nodes down / Partitioning. Note: One can argue CA and CP are almost same since CP sacrifices Availability only during partitioning.
Examples include:
Gives up partial consistency (eventually consistent, may be) in order to be available (or for lower latency).
Examples Include:
Daniel suggests PACELC : In case of partitioning, choose between Availability and Consistency, Else choose between lower-latency and consistency.
i.e. You don’t give up consistency just for availability, sometimes you do just for lower latency. The system could be PA-EL (gives up consistency) or PA-EC (gives up consistency only when there is partitioning not otherwise), etc.
See Also:
Hive files can be stored in any one of following formats:
The default format can be set as follows from Hive:
hive> SET hive.default.fileformat=Orc
ORC is a self-describing type-aware columnar file format designed for Hadoop workloads.
It is optimized for large streaming reads and with integrated support for finding required rows fast.
ORC supports the complete set of types in Hive, including the complex types: structs, lists, maps, and unions
Note that csv files are not self-describing.
Following Hive SQL prepares hive data warehouse repository to store a new table in ORC format:
create table my_orc_table( col1 String, col2 bigint) stored as orc
To dump orc file, use: hive –orcfiledump <location-of-orc-file>
From Apache Spark, you can:
Create RDD using Hive SQL : val my_rdd = hiveContext.sql(“select * from my_orc_table”) - Lazy action.
Store RDD directly into any ORC file - my_rdd.saveAsOrcFile(“my_orc_file”) - Triggers action ?
Store RDD into hive temporary table (caches to prevent recomputing RDD later): my_rdd.registerTempTable(“my_temp_table”) - Lazy hint to cache later.
hiveContext.sql(“load data inpath ‘my_orc_file’ into table my_orc_table”)
Also See: * http://code.tutsplus.com/tutorials/vagrant-what-why-and-how–net-26500
Synopsis:
$ gem install vagrant
$ mkdir -p ~/Vagrant/test
$ cd ~/Vagrant/test
$ vagrant box add precise32 http://files.vagrantup.com/precise32.box
# You see here the argument "precise32" which is a nickname for the URL.
# You can now create an instance, which will download this .box.
$ vagrant init precise32
$ vagrant up
# It will now be running. Easy! If you want to get into this instance, via SSH, use this command:
$ vagrant ssh
$ vagrant list-commands
There is Vagrantfile where you can configure:
Here is an example Vagrantfile:
# -*- mode: ruby -*-
# vi: set ft=ruby :
Vagrant::Config.run do |config|
config.vm.box = "precise32"
config.vm.box_url = "http://files.vagrantup.com/precise32.box"
# Assign this VM to a host-only network IP, allowing you to access it
# via the IP. Host-only networks can talk to the host machine as well as
# any other machines on the same network, but cannot be accessed (through this
# network interface) by any external networks.
config.vm.network :hostonly, "192.168.33.10"
# Set the default project share to use nfs
config.vm.share_folder("v-web", "/vagrant/www", "./www", :nfs => true)
config.vm.share_folder("v-db", "/vagrant/db", "./db", :nfs => true)
# Forward a port from the guest to the host, which allows for outside
# computers to access the VM, whereas host only networking does not.
config.vm.forward_port 80, 8080
# Set the Timezone to something useful
config.vm.provision :shell, :inline => "echo \"Europe/London\" | sudo tee /etc/timezone && dpkg-reconfigure --frontend noninteractive tzdata"
# Update the server
config.vm.provision :shell, :inline => "apt-get update --fix-missing"
# Enable Puppet
config.vm.provision :puppet do |puppet|
puppet.facter = { "fqdn" => "local.pyrocms", "hostname" => "www" }
puppet.manifests_path = "puppet/manifests"
puppet.manifest_file = "ubuntu-apache2-pgsql-php5.pp"
puppet.module_path = "puppet/modules"
end
end
Here is an example puppet manifests file for installing apache and configure the default parameters. It refers to predefined puppets module ‘apache’
include apache
$docroot = '/vagrant/www/pyrocms/'
$db_location = "/vagrant/db/pyrocms.sqlite"
# Apache setup
class {'apache::php': }
apache::vhost { 'local.pyrocms':
priority => '20',
port => '80',
docroot => $docroot,
configure_firewall => false,
}
a2mod { 'rewrite': ensure => present; }
The manifests are OS independent. The module has been written to capture OS specific installation details.
For creating core-OS cluster using vagrant, See: https://coreos.com/os/docs/latest/booting-on-vagrant.html :
Synopsis
$ git clone https://github.com/coreos/coreos-vagrant.git
$ cd coreos-vagrant
$ curl https://discovery.etcd.io/new?size=3 # produces output file containing. e.g. https://discovery.etcd.io/e850b12eb5569631101bf5ac7db2e22f
# The e85...22f value is called token.
$ cp user-data.sample user-data
$ vi user-data # Edit file to provide config in cloud-config format. Replace the <token> with the token value.
# The values of $public_ipv4 and $private_ipv4 are derived using Vagrant file config.
$ vi config.rb # Set $num_instances = 3
$ vagrant up
$ vagrant status
--- running status of 3 machines --
$ ssh-add ~/.vagrant.d/insecure_private_key
$ vagrant ssh core-01 -- -A # Connects to single machine in cluster
# Your Vagrantfile should copy your cloud-config file to /var/lib/coreos-vagrant/vagrantfile-user-data
$ ls -l /var/lib/coreos-vagrant/vagrantfile-user-data
$ fleetctl list-machines
# Lists machine-ids and status
$ cat > hello-fleet.service
[Service]
ExecStart=/usr/bin/bash -c "while true; do echo 'Hello Fleet'; sleep 1; done"
$ fleetctl start hello-fleet.service
Job hello-fleet.service scheduled to 517d1c7d.../172.17.8.101
$ etcdctl set first-etcd-key "Hello World" # Set a config parameter in one machine. Reflects in all cluster...
$ fleetctl ssh cb35b356 etcdctl get first-etcd-key
Hello World
See Also:
NYSE generates 1 TB data per day. Facebook generates 20 TB per day. The internet archive stores around 20TB per month !!!
See O’Reily MapReduce Design Patterns.
See following figure: Image Source: http://stackoverflow.com/questions/22141631/what-is-the-purpose-of-shuffling-and-sorting-phase-in-the-reducer-in-map-reduce :
The record reader translates an input split generated by input format into records.
In the mapper, user-provided code is executed on each key/value pair from the record reader to produce zero or more new key/value pairs, called the intermediate pairs
The combiner, an optional localized reducer, can group data in the map phase.
The reduce task starts with the shuffle and sort step. The shuffle is nothing but the process of receiving/aggregating data from different mappers and assign it to the relevant reduce task for the corresponding key. This step takes the output files written by all of the partitioners and downloads them to the local machine in which the reducer is running. Not customizable except providing Comparator object to tell how to sort.
The reducer takes the grouped data as input and runs a reduce function once per key grouping. The function is passed the key and an iterator over all of the values associated with that key. A wide range of processing can happen in this function, as in many of our patterns. The data can be aggregated, filtered, and combined in a number of ways. Once the reduce function is done, it sends zero or more key/value pair to the final step, the output format. Like the map function, the reduce function will change from job to job since it is a core piece of logic in the solution. The output format translates the final key/value pair from the reduce function and writes it out to a file by a record writer. By default, it will separate the key and value with a tab and separate records with a newline character.
* Input (Key, Value)
* Output: List(Key , Value ) # For each input pair you can produce 0 or more list of output pairs. Key is comparable.
* Projections, Filtering, Transformation
Example: Input: [ (1, line1), (2, line2), ... ] and output: [ (word1, N1), (word2, N2), ... ]
* Input: List(Key, Value)
* Output: Sort(Partition(List(Key, List(Value ))))
* Input: List(Key, List(Value))
* Output: List(Key, Value )
* Aggregation
It is customizable. If there are many jobs running, delaying the reduce job until all mappers finish is useful. Otherwise starting asap is better.
Depending on the problem, reducer may require completion of the entire mappers to finish or can do incremental work. http://stackoverflow.com/questions/11672676/when-do-reduce-tasks-start-in-hadoop
Set mapreduce.job.reduce.slowstart.completedmaps from 0.0 to 1.0 in mapred-site.xml.
Mappers don’t have to communicate with each other.
There is fileSplit (e.g. 1GB), blocks (e.g. 64MB), RecordReader, LineRecordReader. If the split is not the first split, the first line (in the first block) is always ignored since it relies on the previous split handler to read the last line after the last block.
Yes. There is support for using scripting with Hadoop Map Reduce stages
bin/hadoop jar hadoop-streaming.jar -input in-files -output out-dir -mapper mapper.sh -reducer reducer.sh
mapper.sh: sed -e s/ /n/g | grep .
reducer.sh: uniq -c | awk {print $2 t $1}
You can have both using rpy2 library of python. See http://rpy.sourceforge.net/rpy2/doc-2.1/html/introduction.html
http://www.slideshare.net/ikewu83/dean-keynoteladis2009-4885081
Some reference times from 2011
L1 cache reference 0.5 ns
Branch mispredict 5 ns
L2 cache reference 7 ns
Mutex lock/unlock 100 ns (25)
Main memory reference 100 ns
Compress 1K bytes with Zippy 10,000 ns (3,000)
Send 2K bytes over 1 Gbps network 20,000 ns
Read 1 MB sequentially from memory 250,000 ns
Round trip within same datacenter 500,000 ns (.5 ms)
Disk seek 10,000,000 ns (10 ms) (SSD Disk around 0.1ms ?)
Read 1 MB sequentially from network 10,000,000 ns
Read 1 MB sequentially from disk 30,000,000 ns (30 ms)
Send packet CA->Netherlands->CA 150,000,000 ns (150 ms)
SAN - Storage area network. SAN is nothing but a high speed network that makes connections between storage devices and servers. SAN is a more sophisticated NAS device that allows others computers to treat its partitions like a DAS. Has multiple network ports to provide speed boost. More complex to configure, so more error prone. Provides raw block level storage access. Interface is either Fiber channel or iSCSI or Infini band.
NAS - Network attached storage. Typically, has IP address and runs custom OS as file server. Shares files over the network. Not storage device over the network.
DAS - Directly attached storage. Highest speed, but can not share the storage with other computers (without additional software support). For bigdata also DAS is preferred since this can horizontally scale out. For exclusive access of disk (eg. RDBMS storage), DAS should be preferred. Note: Disk recovery in NAS is more simpler compared to DAS ?
- Fiber Channel :Uses Fiber protocol. Cable can even be copper. Can multiplex IO and network using same adapter.
Point-to-point between host and storage array. Speed in range of 1.6 GB/sec or 3.2 GB/sec or 6.4G GB/sec.
Thunderbolt
SAS - Serially Attached SCSI (300 MB/sec or 600 MB/sec or 1.2GB/sec speed for 1/2/3rd generation)
SATA I, SATA II, SATA 3.0 (6 Gbps)
SCSI
- Ethernet (1 or 10 or 100 Gbps) or 100 MB/sec or 1 GB/sec or 10 GB/sec )
- Storage device may be connected thru ethernet using iSCSI protocol. Used to carry SCSI traffic over IP networks. Exposes blocklevel access over network compared to NFS which exposes file level access.
USB
Note: DRAM can transfer speed at 20 GB/sec
1 Gbps network port will push about 125MB / second of data. This is about same as single IDE disk read access rate.
10 Gbps network port can push about 1GB / sec data. This is about same as 10 RAID disks attached which can read about 1 GB/sec data.
This is about 100 disks each with 100MB/sec read speed or 25 disks with each 400MB/sec read speed. i.e. This network can withstand around 25 computers on network each bumping around 400MB/sec data !!! It is just half the speed of single DRAM memory channel !!!
Following tips will be useful in setting up and troubleshooting cluster machines.
netcat and nc are aliases.
Usage
$ netcat localhost 80 # TCP connect like telnet on port 80
$ netcat -u localhost 80 # You can send UDP packet instead of TCP.
$ netcat -z -v localhost 1-1000 # Scan all ports upto 1000. nmap tool may be even better.
$ netcat -l 4444 # Listen on TCP port 4444. Act like a server. It will exit after 1 session.
# You can interact with connection using stdin.
$ netcat -l 4444 > received_file # Receive file through netcat. Wait till some one sends it.
$ netcat localhost 4444 < sendfile # Send file through netcat.
$ netcat -l 8888 < index.html # Act like simple webserver, which dumps html as soon as it receives a connection.
# http://localhost:8888 will show this html.
$ while true; do nc -l 8888 < index.html; done # Keep serving the same page again and again.
$ nc -lk 8888 < index.html # It does the same thing as above. Listen in a loop.
$ echo -n "GET / HTTP/1.0\r\n\r\n" | nc host.example.com 80 # Get http page from server.
$ nc localhost 25 << EOF # Act like smtp client
HELO host.example.com
MAIL FROM: <user@host.example.com>
RCPT TO: <user2@host.example.com>
DATA
Body of email.
.
QUIT
EOF
See Also: man nc
Oracle RAC: Database servers running in different machines share large disk arrays connected by SAN. (i.e. highspeed network switches and Fiber Channel cables). Hadoop uses many computers each has DAS (directly attached storage device).
There is no shortage of SQL on Hadoop offerings, and each Hadoop distributor seems to have its preferred flavor.
The list begins with the original SQL-on-Hadoop engine, Apache Hive, which was created at Facebook and is now backed by Hortonworks with its Stinger initiative.
In 2012, Cloudera rolled out the first release of Apache Impala to great fanfare, while lately MapR has been pushing the schema-less bounds of SQL querying with Apache Drill, which is based on Google’s Dremel.
But the prospective SQL-on-Hadoop user may also want to check out other offerings, including those from mega-vendors like: BigSQL (backed by IBM), Big Data SQL (backed by Oracle), and Vertica SQL on Hadoop (backed by Hewlett-Packard). Others in the race include Presto (created by Facebook, now backed by Teradata), Vortex (backed by Actian), Apache HAWQ (backed by Pivotal) and Apache Phoenix. Last but not least is Spark SQL, the SQL-enabling component of Apache Spark that is quickly gaining momentum. Also Microsoft Polybase.