Showing posts with label Big Data Analytics. Show all posts
Showing posts with label Big Data Analytics. Show all posts

Spark Word Count program in Python

Here is the word count program in Python using Spark (pyspark) and Hadoop (hdfs). In this tutorial, you will get to know how to process the data in spark using spark RDDs, store or move a file in a Hadoop HDFS, and how to read that file for spark processing using python cmd line arguments.

Note: Make sure you have Hadoop on your system and it's in running mode. You can check it by using 'jps' command on your terminal. ( If Hadoop is not installed then you can follow a step by step guide from here )

1. Python code to count the occurrences of a word in a file.

from __future__ import print_function
import sys
import findspark
findspark.init()
from operator import add
from pyspark.sql import SparkSession

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: wordcount <file>", file=sys.stderr)
        sys.exit(-1)

    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))

    spark.stop()

2. Move the word file to HDFS,
    hadoop fs -put localfile /home/username/file/path/words.txt /hdfs/path

3. Run the python code.
    python word_count.py hdfs:////usr/words.txt

4. Yeah!! you have done an output is return with each word and its number of occurrence.

Explain Hadoop Ecosystem and briefly explain its components.

Hadoop is a framework which deals with Big Data but unlike other frameworks, it's not a simple framework, it has its own family for processing different thing which is tied up in one umbrella called as Hadoop Ecosystem.
Fig. Hadoop Ecosystem

1) SQOOP : SQL + Hadoop = SQOOP
        When we import any structured data from a table (RDBMS) to HDFS a file is created in HDFS which we can process by either MapReduce program directly or by HIVE or PIG. Similarly, after processing data in HDFS we can store the processed structured data back to another table in RDBMS by exporting through SQOOP.

2) HDFS (Hadoop Distributed File System)
        HDFS is a main component of Hadoop and a technique to store the data in a distributed manner in order to compute fast. HDFS saves data in a block of 64MB (default) or 128MB in size which is a logical splitting of data in a data node in Hadoop cluster. All information about data splits in data node known as metadata is captured in Name node which is again a part of HDFS.

3) MapReduce Framework
        It is another main component of Hadoop and a method of programming in a distributed data stored in an HDFS. We can write MapReduce program by using any language like JAVA, C++, Python, Ruby, etc. By name only MapReduce gives its functionality, Map will do mapping of logic into data and once computation over reducer will collect the result of Map to generate final output result of MapReduce. Eg. Word Count using MapReduce

4) HBASE
        HBase is a non-relational (NoSQL) database that runs on top of HDFS.HBase was created for large tables which have billions of rows and millions of columns with fault tolerance capability and horizontal scalability and based on Google Big Table.

5) HIVE
        Many programmers and analyst are more comfortable with structured query language (SQL) than JAVA or any other programming language for which HIVE is created and later donated to Apache foundation. HIVE mainly deals with structured data which is stored in HDFS with a Query language similar to SQL and known as HQL (Hive Query Language).

6) Pig
        Similar to HIVE, PIG also deals with structured data using PIG Latin language. PIG was originally developed at Yahoo to provide programmer who loves scripting and don't want to use JAVA or Python or SQL to process data. A Pig Latin program is made up of series of operations, or transformations that are applied to the input data which runs MapReduce program in balanced to produce output.

7) Mahout
        Mahout is an open source machine learning library from Apache written in JAVA. Mahout aims to be the machine learning tool of choice when the collection of data to be processed is very large, perhaps for too large for a single machine.

8) Oozie
        Oozie is a workflow scheduler that manages the Hadoop jobs. It provides a mechanism to run a particular job at a particular or given time and also repeat that job at predetermined intervals.

9) Zookeeper
        Zookeeper is a distributed or a centralized service that provides working service for a Hadoop cluster. It allows the developer to focus on core application logic without bothering about the distributed nature of the application. It is fast, reliable and simple.

Explain how to compute Page Rank for any web graph.


  • Page Rank is a function that assigns a real number to each page in the Web. The intent is that the higher the Page Rank of a page, the more important it is.
  • Think of the web as a directed graph where pages are the nodes and links between those pages as an edge of a graph, there is an arc from page p1 to page p2 if there are one or more links from p1 to p2.
Consider an example given below as a web with four pages (A, B, C, D). Page  A has links to each of other three pages, page B has links to A and D only, page C has a link only to A, and page D has links to B and C only.
Fig. A hypothetical example of the web

Suppose a random surfer starts at page A. There are links to B, C and D so this surfer will next be at each of those pages with probability 1/3, and has zero probability has zero probability of being at A. The random surfer at B has a probability of 1/2 of being at A or at D and zero at B and C.
    In general, we can define a transition matrix of the web to describe what happens to random surfer after one step. This matrix has 'n' row and columns if there are 'n' number of pages. The element mij in row i and column j has value 1/k if page j has k arcs out and one of them is to page i, otherwise mij = 0.
The transition matrix for the web is,
0 1/2 1 0
1/3 0 0 1/2
1/3 0 0 1/2
1/3 1/2 0 0

Suppose we start a random surfer at any of the 'n' pages of the web with equal probability. Then the initial vector V0 will have 1/n for each component. If M is the transition matrix of the web, then after one step the distribution of the surfer will be Mv0, after two step it will be M(Mv0) = M2 v0, and so on. In general, multiplying the initial vector vo by M a total of i times will give us the distribution of the surfer after i steps.
    The probability xi that a random surfer will be at node i at the next step is  ∑ j mijvj . Here mijis a probability that a surfer at node j will move to node i at the next step and vj is the probability that the surfer was at node j at the previous step.
    It is known that the distribution of the surfer approaches a limiting distribution v that satisfies, V = Mv, provided two conditions are met,
  1. The graph is strongly connected, i.e, it is possible to get from any node to any other node.
  2. There are no dead ends node that have no arcs out.

Give Map Reduce algorithm for Natural Join of two relations and Intersection of two sets.

Algorithm for Natural Join

For doing Natural join, the relation R(A, B) with S(B, C), it is required to find tuples that agree on their B components, i.e, the second component from tuples of R and the first component of tuples of S. Using the B-value of tuples from either relation as the key, the value will be the other component along with the name of the relation, so that Reduce function can know where each tuple come from.

Map-function
For each tuple (a, b) of R, produce the key-value pair (b, (R, a)). For each tuple (b, c) of S, produce the key-value pair (b, (S, c)).

Reduce function
Each key-value b will be associated with a list of pairs that are either of the form (R, a) or (S, c). Construct all pairs consisting of one with the first component of R and the other with the first component of S, say (R, a) and (S, c). The output for key b is (b, [(a1, b1, c1), (a2, b2, c2), ...])

Pseudo Code:
map(key, value):
    if key == R:
        for (a, b) in value:
            emit (b, (R, a))
    else:
        for (b, c) in value:
            emit (b, (S, c))

reduce(key, value):
    list_R = [a for (x, a) in values if x == R]
    list_S = [c for (x, c) in values if x == S]
    for a in list_R:
        for c in list S:
            emit (key, (a, key, c))

Algorithm to perform Intersection of two sets:

Mappers are fed by all tuples if both R and S relations to be intersected. Reducer emits only tuples that occurred twice. It is possible only if both the sets contain this tuple because tuples include the primary key and can occur in one set only once in each relation.

Map function: Turn each tuple t into a key-value pair (t, t)

Reduce function: If key t has value list [t, t], then produce (t, t) otherwise produce (t, NULL).

Pseudo Code:
map(key, value)
    for tuple in value:
        emit (tuple, tuple)

reduce(key, value):
    if value == [key, key]:
        emit (key, key)

Explain how dead ends are handled in Page Rank.

A dead end is a Web Page with no links out. The presence of dead ends will cause the Page Rank of some or all the pages to go to 0 in the iterative computation, including pages that are not dead ends.
Dead ends can be eliminated before undertaking a Page Rank calculation by recursively dropping nodes with no arcs out. Note that dropping one node can cause another which linked only to it to become a dead end, so the process must be recursive.
Two approaches to deal with dead ends:

  1. We can drop the dead ends from the graph, and also drop their incoming arcs. Doing so may create more dead ends which also have to be dropped recursively. However eventually, we wind up with a strongly-connected component (SCC) none of whose nodes are dead ends. Recursive deletion of dead ends will remove parts of the out-components, tendrils, and tubes but leave the SCC and the in-component, as well as parts of any small isolated components.
  2. We can modify the process by which random surfers are assumed to move about the Web. This method which we refer to as 'taxation' also solves the problem of spider traps. Here, we modify the calculation of Page Rank by allowing each random suffer a small probability of teleporting to a random page, rather than following an out-link from their current page, The iterative step where we compute a new vector estimate of Page Rank v' from the current Page Rank estimate v and the transition matrix 'M' is,
    v'=βMv+(1β)en
    )Mv
    where β is the chosen constant usually in the range of 0.8 to 0.9,
    e is a vector of all 1's with the appropriate number of components,
    n is the number of nodes in the Web graph.
The term (1-β)e/n does not depend on the sum of the components of the vector v, there will always be some fraction of a surfer operating on the Web. That is, when there are dead ends, the sum of the components of v, may be less than 1, but it will never reach 0.

Give 2-step Map Reduce algorithm to multiply two large matrices.

M is a matrix with element mi,j in row i and column j.
N is a matrix with element nj,k in row j and column k.
P is a matrix = MN with element Pi,k in row i and column k, where Pi,k = ∑j mi,jnj,k
2-step Map Reduce

First Iteration
Record Reader 1: (M, i, j, mi,j), (N, j, k, nj,k)
Mapper 1: For each matrix element  mi,j emit the key-value pair (j, (M, i, mi,j))
For each matrix element  nj,k emit the key-value pair (j, (N, k, nj,k))
Shuffle 1: [j, (M, i, mi,j), (N, k, nj,k)]
Reducer 1: For each key j, for each value that comes from M, say (M, i, mi,j) and each value that comes from N, say (N, k, nj,k) emit the key-value pair (j, (i, k, mi,j nj,k)).
The output of this reduce function is used as the input to the Map function in the next iteration.

Second Iteration
Mapper 2: For each key-value pair (j, (i, k, mi,j nj,k)) emit the key-value pair ((i, k), p),  where p = mi,j nj,k
Shuffle 2: ( (i, k), [p1, p2, ...pn])
Reducer 2: For each key (i, k) emit the key-value pair (i, k) where p is the sum of the list of values associated with this key and is the value of the element in row i and column k of the matrix P  = MN.
i.e, ( (i, k), Pi, k)

Give problem in Flajolet-Martin (FM) Algorithm to count distinct elements in a stream.

To estimate the number of different elements appearing in a stream, we can hash elements to integers interpreted as binary numbers. 2 raised to the power that is the longest sequence of 0's seen in the hash value of any stream element is an estimate of the number of different elements.
Eg. Stream: 4, 2, 5 ,9, 1, 6, 3, 7
Hash function,  h(x) = (ax + b) mod 32
a) h(x) = 3x + 1 mod 32
b) h(x) = x + 6 mod 32

a) h(x) = 3x + 7 mod 32
h(4) = 3(4) + 7 mod 32 = 19 mod 32 = 19 = (10011)
h(2) = 3(2) + 7 mod 32 = 13 mod 32 = 13 = (01101)
h(5) = 3(5) + 7 mod 32 = 22 mod 32 = 22 = (10110)
h(9) = 3(9) + 7 mod 32 = 34 mod 32 = 2 = (00010)
h(1) = 3(1) + 7 mod 32 = 10 mod 32 = 10 = (01010)
h(6) = 3(6) + 7 mod 32 = 25 mod 32 = 25  = (11001)
h(3) = 3(3) + 7 mod 32 = 16 mod 32 = 16  = (10000)
h(7) = 3(7) + 7 mod 32 = 28 mod 32 = 28  = (11100)
Trailing zero's {0, 0, 1, 1, 1,  0, 4, 2}
R = max [Trailing Zero] = 4
Output = 2R  = 24  = 16

b) h(x) = x + 6 mod 32
h(4) = (4) + 6 mod 32 = 10 mod 32 = 10 = (01010)
h(2) = (2) + 6 mod 32 = 8 mod 32 = 8  = (01000)
h(5) = (5) + 6 mod 32 = 11 mod 32 = 11  = (01011)
h(9) = (9) + 6 mod 32 = 15 mod 32 = 15 = (01111)
h(1) = (1) + 6 mod 32 = 7 mod 32 = 7 = (00111)
h(6) = (6) + 6 mod 32 = 12 mod 32 = 12 = (01110)
h(3) = (3) + 6 mod 32 = 9 mod 32 = 9 = (01001)
h(7) = (7) + 6 mod 32 = 13 mod 32 = 13 = (01101)
Trailing zero's {1, 3, 0, 0, 0, 1, 0, 0}
R = max [Trailing Zero] = 3
Output = 2R  = 23  = 8

Find Manhattan distance ( L1-norm) and Euclidian distance(L2-norm) for X = (1, 2, 2) and Y = (2, 5, 3)


1) Manhattan Distance (L1) is given as , 


Here,  = | 1 - 2 | + | 2 - 5 | + | 2 - 3 |
       
          = 1 + 3 + 1
    
          = 5



2) Euclidean distance (L2) is given as,
 

                  ______________________
Here, =   (1 - 2)2 +  (2 - 5)2 +  (2 - 3)2  
                       __________
             =  √  1 + 9 + 1
                 ____
          =  √11  

      = 3.316

Define 3 V's of Big Data


Big Data is characterized by 3 V's, they are as follow:
1) Volume,
2) Velocity,
3) Variety

 1) Volume (Data at Rest) 
-> The name 'Big Data' itself is related to a size which is enormous. The size of data plays very crucial role in determining value out of data. Also, whether a particular data can actually be considered as a Big Data or not, is dependent upon the volume of data.
-> Here, the size of data is much more than terabytes to exabytes.

 2) Velocity (Data in Motion)
 -> The term velocity refers to the speed of generation of data from different sources. How fast the data is generated and processed to meet the demands, determines real potential in the data.
-> Big Data velocity deals with the speed at which data flows in from sources like business processes, application logs, network and social sites, sensors, etc.
 -> Streaming data milliseconds to seconds to respond.

3) Variety (Data in Many Forms)
 -> Variety refers to heterogeneous sources and the nature of data, both structured and unstructured or semi-structured. Earlier spreadsheets and databases were the only sources of data considered by most of the applications.
 -> But now day's data in the form of emails, photos, videos, PDF's, audio, etc. is also being considered in the analysis applications. This variety of unstructured data create many issues for storage, mining and analyzing data.