Sunday, 23 December 2012

Data Analytics : Hatari !

Embedding Pig in Python - Hatari!
Pig Latin does not allow control structures like Java and other high-level languages. So inorder to get more control over the execution context, loop through a set of parameters etc we can embedd Pig in Python. This is supported only for Python 2.7 and not above. At first this may seem not so useful. But, if you have a single script, i.e a parameterised pig script like the one in the previous post, you can attach parameters to the script from Python. Plus since Python can also be used to write User Defined Functions, all the code Pig, Python, UDFs reside in the same project. This is usefull when things get more complex. In the previous post note that, I used a java project to create a java UDF which was used in Pig. So I had 2 deployments a pig script and a Java jar. But, if we embedd pig in python, we have only one deployment, the python code.

1) Write pig script in a file.
2) Use compileFromFile() function to load and compile your script. 
3) Bind using your list of map of parameters. yes a list of map of parameters.
4) Run the script using runSingle()
5) Get status on how things went using runSingle(). You can also use isSuccessFul().

Better yet Do this again for all your scripts. You can even read parameters from a file, add threading where each thread will execute a script with one set of parameters.

Source code with an example is available here

Ref: More details can be found in the presentation at by Julien Le Dem.

Ref: Good Read Chapter 9 on Book, Programming Pig by Alan F Gates

Data Analytics: Using Apache Pig User Defined Functions

Apache pig helps to analyse large data sets. We can define data flows in pig latin scripts and extract interesting information from the initial data set. For example, we may have a large data dump of user actions on your website as a csv file or in a database like hbase. Then we want to find the most frequented part of your site or the top 10 things that interests your user base in your site. To write up a solution for this is possible in a highlevel language like Java. The optimizations for handling data size as it grows, splitting the task into multiple parts, keeping track of those individual tasks will be challenging and will consume a lot of effort. Apache Hadoop and Pig help to solve this problem by providing a framework with which we can focus on the data flow rather than on the plumbing.

Derieving interesting data is always tied to a time period. We may want to extract interesting information from the whole life time of the data or we want to perform the same on a given time period say the last month or week. To specify such options, we have user defined functions in pig. This allows us to write filters and operations that we want pig to perform on each entry of the data set. This gives more control on the data filtering, flow. Some of the nuances of pig udf are explained in the example below.

Pig Version of this example: Apache Pig version 0.10.0 (r1328203)

Objective: You want to write a filter function in PIG to filter data rows according to a date range that you are interested in. You want to invoke the script from a scheduler which passes in the date range as command line parameters. A pig script is shown in the image below.

1) Passing command line parameters to pig script: You need to pass command line arguments like this pig -param datefrom='2012-10-20 00:00:00' -param dateto='2012-10-23 00:00:00' -x mapreduce user-based-analytics.pig. (I am actually calling the script from Python, which we will see in the next post).

Here I am using these two date parameters to build my Java UDF. If you are passing parameters with space character, it has to be like this otherwise pig will throw an error saying that, it cannot create your UDF java class.

2) With in the pig script: You refer to the command line parameters using the format '$' for example, '$dateto' and '$datefrom' as in the image above. unless it is an integer like $parallelism.

3) Defining your UDF reference using DEFINE pig keyword: This allows you to create a reference to your UDF which you can call using an alias. For example, the script defines a reference to the UDF as follows,

define date-based-filter com.home.pig.udfs.TimeStampFilter('$datefrom', '$dateto')

where date-based-filter is the alias that I will use to call my UDF com.home.pig.udfs.TimeStampFilter java class.

4) Calling your UDF filter in a pig script using FILTER keyword: Pig does not have a boolean data type. But, expressions are evaluated to boolean true or false. You need to call your UDF as follows, with the alias for your UDF. Here we are checking for datebasedfilter(ts) == TRUE i.e does my UDF 'com.home.pig.udfs.TimeStampFilter' acting on the current row with 'dateto' and 'datefrom' return Java Boolean true or false.

filter-by-date = filter site-data by date-based-filter(ts) == TRUE;

5) Now the Java Class that does the filtering.

a) We have to create a java class that extends FilterFunc from org.apache.pig.FilterFunc. The constructor has to take parameters that, you set in the script above. So we have two parameters.

b) Override the public Boolean exec(Tuple arg0) member function to define how this filter will handle tuples from the script. Here I just get the date from the string and check if it is within the range.

c) Package this in a jar and put it in the same location path as your script.

Why use Pig and UDF
s? Writing UDFs can be easy and saves a lot of time compared to writing a MapReduce Java program or any other option. Plus, if you have a ton of data or will end up with one this is better option since Hadoop will scale and Pig will do the jobs like data groupings, filtering for you.

Better to use Python? Although it is easy to write the UDF in Java and the justification that Pig is in Java, there is already a java environment turned on; it may be better to write User Defined Functions in Python and also trigger the script in for greater control! plus every thing will be at one place.

For more on this topic Refer to Chapter 10 in Book, Programming Pig by Alan F Gates

Thursday, 16 August 2012

Go Parallel

Objective : To Parallelise merge sort algorithm.

The part of merge sort that presents opportunity for parallel programming is the merge part. This takes two sorted arrays and merges them. The usual algorithm compares each element of the two arrays and writes them in order and finally writing the array with more elements, if any. 

For parallel merging there is another algorithm, for two arrays A and B. Choose an element X of A so that it divides A into two almost equal parts A1 and A2. Find where X will fit into B (using binary search). Split B at this point into B1 and B2. Merge A1 and B, A2 and B2 recursively. The advantage is that, these two merges can be implemented by independant hardware such a multiple processors since the loop dependancy of the serial algorithm is removed in the parallel algorithm. The program will provide a visible speed up on Cilk plus for intel. But, on Java vm on my 2 core processor machine the speed up is overrun with the overhead for scheduling, Executor service in Java concurrency package.

The run times of parallel and serial merges are shown below. The overhead of Future, Callable, Executor in java concurrency package is coupled with the recursive calls.

The parallel merge is 10 times slower with two cores But, it is up to the jvm to parallelise. In Java, programmers use concurrency package for parallel executions whereas C++ has a number of opensource options for targeting multi core parallelism. The speedup approaches  serial speeds depending on the judicious choice of when to stop the recurssion tree (ie the number of elements at which we switch to serial merge!).

The Java code part spawns a new thread for one of the merges described above and the calling thread continues with the current merge. We can use two threads for each of the sub-merges but, it adds to the overhead. The first approach is shown below with the latter commented.
This is a fork and join approach/pattern for parallel programming. We can also use a map pattern for parallel programming by dividing the array conveniently and merging at multiple stages using a reducer, maybe that will give a lot more direct speed up than fork-join pattern.

Reference: This technique for parallel merge sort originally appearing in 'Structured Parallel Programming' from Morgan Kaufman. If interested read this book.

Monday, 16 July 2012

Living with Linux Desktops: KDE4 V Xfce on Opensuse 12.1

Here is a brief comparison of Xfce desktop and KDE desktop that comes with opensuse linux 12.1. The screen shots for the konsole using top command for memory and system monitor shows the difference. Notable is that KDE Dolphin uses 25MB whereas, Thunar the default file browser for Xfce used 17MB. Half of Dolphin. On CPU and memory Xfce desktop has half resource usage compared to KDE4. I have not used KDE plasma graphics too much on my desktop. KDE4 desktop is configured to use clean looks and use suse look and feel. Having Xfce on an older hardware can make it faster or comparable to a new hardware running latest operating system. 

File Manager
Dolphin 25MB
Thunar 17MB

          CPU(2cores)      Memory
KDE      6, 3.9%                      0.30 GB / 2.0 GB
Xfce     2, 3.9%                      0.16GB / 2.0 GB

Xfce feels responsive and faster. In fact too fast. The one disadvantage of Xfce is that, sometimes the widgets/panel apps don't work. For example, the window manager had to be started manually at command line. Or the windows did not have close/minimize/maximize buttons. Again, Lock Screen does not seem to invoke any response from Xfce and also resuming after hibernating does not prompt for password by default.


KDE Top and System Monitor

Xfce Top and System Monitor

Monday, 9 July 2012

Friend Suggestion like Facebook, Google+ or Product Suggestion like Amazon: Implementing for your own web app, site

This is an application of Apache Hadoop MapReduce framework. Facebook has a feature that says that, some people may be your friends. This is calculated based on a number of factors such as common friends, likes, visits to profile,
comments made etc. Basically the interactions is logged, analyzed and put through a kind of weighted matrix analysis and the connections which are above a threshold value are chosen to be shown to the user.

Problem: An input file of 1GB size with the logs of interactions among 26,849 people. To keep the data manageable on my computer, I am creating random logs with interaction of 1000 people with the rest of the people on the, say social
networking site. Gathering and presenting this data to map reduce is itself a job. In our case, these 1000 people need friend suggestions.

Solution: Here this is done in a Single map reduce job although, more data and intermediate jobs can be added in real life. The mapper for the job takes each interaction and emits the user pair and interaction statistics. For example,
Sam Worthington has 1 profile view, 2 friends in common, 3 likes with Stuart Little. so the out put of the mapper here is . We could use the userid across the site rather than a name. For each person interacting with Stuart Little a record like this is produced.

The reducer simply takes the output of the mapper and calculates the sum of the interactions per user-user pair. So, for a week of interactions the record from the reducer may be, for example, . This is got by summing up the interactions by the reducer. Then, each interaction can be given a weight/value then, the number in the record can be multiplied by the weight and summed. Any calculation is ok as long as it give a meaning result based on the interaction types. Then, interactions with a score larger than a threshold can be chosen to be presented to the user when they login.

Other factors like user interests, activities etc can also be accounted for. Here we can simple treat each as a user and emit a record if there is a related activity in user's profile and proceed as usual. The same approach can be applied to ad serving utilizing info from user profile and activity. A Combiner and mutiple jobs can be used to get more details too.

Results: For 1GB, in real life big data approaches > 2TB, of randomly generated log of interactions of 26K+ people with a select 1000, the mapreduce program gave the friend suggestions in 13 mintues on my laptop with a pseudo distributed Hadoop configuration. So, if you have a few tera bytes of data and 10 computers in a cluster you can get the result in say 1.5 hours. Then all that is needed would be to consume the friend suggestions via a regular db / webservice and present it to the user when they log in.

Screen shots of the Map Reduce job run on pseudo distributed Hadoop:

1) Copying the data to Hadoop Distributed File System


2) Running the jobs

3) Status on Hadoop admin interface.

4) Friend Suggestions with statistics.

5) Job code

6) Mapper

7) Reducer

Sunday, 1 July 2012

Apache Hadoop Map Reduce - 1

Hadoop is a map reduce framework that allows for parallel distributed computing. The criteria for adopting Hadoop is that, the same operations are performed on records and records are not modified once created. So Reads are more frequent than write in a traditional relational database system. The advantage of Hadoop or a Map Reduce frame work comes into action when the data is large enough ~ 1TB. Above this point data is referred to as big data approaching PetaB. At such a point it is better to throw multiple machines at the task rather than to use a single machine with parallel programming. The technique used is Map Reduce which deals with Key and value pairs.

A map operation is performed on records whose inputs and outputs are key value pairs themselves. The output is merged, sorted and submitted to a reduce operation which combines and generates the output. For example, we have 12 years of student records for examinations across 15 subjects. The total number of records is 5,700,000. On this data set we can find operations such as highest score on a subject over the years, total failure reported across the year etc. To find the total failures, the map operation outputs a key value pair   every time it encounters a failed exam. The reduce job takes all such key value pairs and emits a Key value pair .

The hadoop standalone configuration on these records finished in ~ 35-40 seconds. The hadoop run start is shown here
 The run end is here
Output is here. 

The same on a pseudo clustered mode gives results for similar operations in ~40 seconds. The difference is that, the input files have to copied into the Hadoop distributed file system and the output needs to be copied/read from the HDFS. As shown here

Saturday, 2 June 2012

Java Concurrency - Beyond Synchronized

Synchronized keyword is frequently used in Java. But, Synchronized cannot provide a method for threads to signal one another of conditions that others may be waiting on. Another thing is that synchronized does not act on a fairness policy. A thread that is starved for quite some time need not get access over others. i.e the fact that you have been waiting for some time may not be considered. 

An example for conditions is availability of elements in a shared data structure queue. Such a facility is provided by java.util.concurrent.locks.Condition. This allows threads to wait for a condition on a lock and relinquish the lock so that, other threads can full fill the condition that is needed for the waiting threads to continue. This logic can be programmed using locks but, Condition provides this off the shelf. Once the lock is acquired thread waits for the condition mandatory for it to continue. When the condition is true it continues. 

Fairness policy is allowed by a Reentrant lock where the thread waiting for quite sometime will get access. 
The intriguing part of threading in Java is that, the time (in ms) between a consumer / producer is able to regain access to a shared data structure is the same with Condition as it is without Condition. 

Example: Producers & consumers waiting on a lock & its condition. Both share a common queue where the producers put in integers for the consumers to take, contending for the queue access at the same time.  

Code Structure

Code with Synchronized keyword.

Condition creation code.

Code with Condition.

The consumer comes back on receiving the signal

Thursday, 31 May 2012

Java Design Pattern : Service Provider Interface and Factory method

It is common to accomodate multiple service providers while designing an API. Each service provider may implement a use-case that will be used by API client applications. For example, Java security has a number of providers SunJCE, BKS etc which provide security operations such as encryption. Two common patterns that come into play involve defining service provider interfaces, factory classes/methods to get the appropriate target object. Service provider interface is implemented by the service provider classes and there may be more than one type of class that provides a functionality like encryption. There are DESCipher, AESCipher etc all of which implement CipherSpi which is the service provider interface definition. Again the Cipher engine class encapsulates a CipherSpi and creates the appropriate cipher through its factory method getInstance(). The provider list is usually read from a configuration file. Here we use a simple example to demonstrate spis, spiImpls.

Suppose we want a logger. We can define the logger spi which has the functionalities that we want from a logger. We provide two implementations of this spi and annotate the classes with spiImpl. This annotation is not  mandatory. The class name can be anything but, if name conventions are followed then, code becomes maintainable in addition to being simpler. The engine class can be Logger which takes a string parameter which specifies whether we need a File based logger or a Console logger. The Logger class acts like a factory for creating Logger classes and allows client apps to perform logging through it. Although this is a small example, it gets the idea across. SPI, SPIImpl classes are very frequently used in Java 2 Enterprise projects just like Facade pattern. The spi pattern allows us to add different types of providers and also implementations of the interface.

Here is the structure for the project packages.

The sample SPI

Spi Implementation example

Factory method

Application using Logger

Monday, 14 May 2012

Facade Pattern - Java Code Architecture

Object oriented programming involves instances of classes i.e objects interacting with one another towards a common goal. This interaction can be for services, control or data. Objects perform a particular function in a software. When there are a lot of objects the code becomes cumbersome to maintain and use; every time in the same piece of software. 

Facade pattern comes into action when you have a subsystem that performs a particular service for another subsystem or part of the code. A subsystem here in terms of software code means a group of objects interacting to get a job done. Facade pattern hides the objects underneath it and their interactions to object layers above the pattern. For example, information collected from the user interface in a java application may be used to create say, a user object, account object and the user interface may store this to the database by using an object that performs that database operation. Here a Facade object can expose an operation, i.e creating the objects from the data from UI and storing it to the database. This facade can be used by the UI without delving into the details of how it is done. 

If the interaction of the objects in the facade changes, it is contained within the facade and no layer of code outside the facade is affected. This is the most important aspect of the facade pattern that makes it so useful in Java Enterprise where this pattern is more frequently seen in popular Java Enterprise IDE's ready-made-code.

For example, suppose there is a table in the database that stores the details of attendees in a conference. Java enterprise wraps this with an Entity say, Attendee entity which would be part of the JPA ready-made-code. This entity will expose the database operations on the table. A facade would be created namely a session facade which exposes operations that are utilised by controller layers to update information.  The session facade will deal with the requests on the Attendee Entity. 

Here is an example of the code structure for a Tomcat based application that gets information in a table. Called from a servlet. i.e servlet is the controller in Model-View-Controller Application model. 

as you can see from this code, the facade is called to do the job. The code to get a job done becomes small as far a controller object is concerned. A lot of things may happen under the facade but, the higher layer object does not care about this. It is the Facade's responsibility. The code structure in eclipse web project is shown below. There are entity objects and Facade objects. Facade objects here share a common ancestor which declares common operations on entity-facades. In this example, entity objects simulate the JPA entity objects.

The abstract facade is not a must in the general facade pattern but, is used here as is further simplifies code with inheritance. A part of abstract facade is shown here 
A subclass of this called Attendee Facade used in our example is here 
A lot of object interactions and code is thus abstracted from a controller object / higher level object by using the Facade Pattern. It also makes the code more reusable, maintainable, predictable and beautiful.

Thursday, 5 April 2012

Skip Lists Algorithm and Runtime Profiling

Skip List Runtime on dictionay operation compared to Splay Trees and Hash Tables. (Coded in Java)

Skip lists are a relatively new data structure. This is an ordered list where the nodes are connected together at multiple levels. Different levels of connected nodes need not be uniform. This allows the skip list to skip items during a search. Each level is an ordered list in itself.  Every node has a level and is connected to different nodes in the same level. This allows data structure operations to skip through the list. Example is the following list where there are two levels.

Level 1 connections                  E ----------> X
Level 0 connections A ->; B -> E -> F -> G->X -> Y -> Z

Level 0 list is the same as a linked list. E has connections has two levels. If a search was for say 'G'. The algorithms is as follows Start at Level 1, X > F so we move down at 'E' and then proceed at level 0 from E till G is found. Nodes A and B were skipped.

SkipList Search Algorithm:
1) Start from the top level proceed until level 0
    a) Move to the right until a node with node Key < search Key
    b) Jump to the next lower level.
2)  If 1a resulted in an exit in level 0, the next node could be the search node. else it is absent.

Here is a skip list implementation holding values A to Z. The nodes encountered during the search for H and M are shown.

Level 2 Skip List. 

Level 6 Skip List
In order for the list to be helpful level at each node needs to be decided at Random during the insert operation of that node. The maximum level that is useful in a skip list is ln N -1 where N is the number of items expected to go into the skip list. 

Algorithm Inserting in to the Skip List

1 Find the node as in search. This will return the node that will be to the left of the new node (L).
   a) While dropping down to the lower levels maintain the nodes in an update List.
2 Create the new node with a random level. 
3 Adjust the pointers from the existing L node to the new node.If the new level is greater than the existing level of the list, then the head of the list should be adjusted to point to the new node at the new level.
4 Set the next pointers of the new node to next pointers of L
5 Set next pointers of L to the new node at that level. This is done for each node where level was changed in step 1.

For example in the case of 
Level 1 connections                  E ----------> X
Level 0 connections A -> B -> E -> G->X -> Y -> Z. We need to insert H with level 3 then, at level 1 we drop down at E and also at G in level 0. So, G->next = H at level 0, E-> next = H at level 1, F -> next = X at level 1 and at level 2 H is linked to head and tail of the list.

Level 2 connections                                  *<-H->*
Level 1 connections                  E ----------->H-> X
Level 0 connections A -> B -> E -> F -> G->H->X -> Y -> Z

Runtime of Skip Lists: Dictionary operations on skip lists take O(Log(n)) time. Finding each new node's level is done in random as follows

Comparison of Skip List, Splay Binary Search Trees and Hash Table

For a serious input of 56000 word dictionary, the search operations on Skip lists, Splay Trees and Hash Table is as follows. Skip list took 40 comparisons at 11 levels to find the word 'Skip' in the dictionary.

Splay tree took less time than the skip list at
Hash Table left the above two way behind in run time

Result: In general skip lists and splay trees can't hold candle to hash tables. 

Wednesday, 4 April 2012

Ant Power : Build Automation

Ant is helpful in any java project involving large number of resources. Ant based on xml and java itself can help to compile java classes, put the classes in a jar or war, create file folders before building projects and interfacing with servers like tomcat to perform application installation. This is useful in java enterprise web applications and enterprise applications. Ant ships with popular IDEs like eclipse and Rational Application Developer.

Ant manual webpage  provides a sample build.xml file that can be customized by developers instead of writing it from scratch. This is a modification of that, file with modifications pertaining to ; building webapps for tomcat 7.0.26. For example a change is the compilations of classes and creating a war using war rather than jar (purpose!). Some changes were needed for tomcat 7.0.26 for example the manager url.

The sample folder structure used for eclipse dynamic web application is shown here.

The project tag is here. Base dir is .. since the ant build.xml is in 'Ant' folder. Multiple ant files can be grouped in this folder in a huge project.

The properties declaration that I use

The compile element with changes

The dist element which creates the distribution war file in the dist folder.

New elements to stop, remove, compile, install and start the web application.
The resulting WAR file contents