Monday, January 7, 2013

Why I switched to MapR and love EMR ...

A few months back I was approached by a colleague interested in using Hadoop/Hive to power our Tableau clients.  We played around for a bit and learned, the hard way, they the only way were were going to get this to go easily was switching to a MapR distribution.

Fortunately, EMR (Elastic MapReduce) makes this a snap.

./elastic-mapreduce --create --alive --hive-interactive --name "Hive Dev Flow" --instance-t
ype m1.large --num-instances 8 --hadoop-version 1.0.3 --hive-versions 0.8.1.6 --ami-version 2.3.0 --with-supported-products mapr-m3 

Literally, you just need to add the parameter (--with-supported-products mapr-m3).

Next you need to open up 8453 (and possibly some others) on your Hadoop master at EC2 (in Security Groups), and then run the MapR ODBC connector tool.  Now you are ready to rock (with any ODBC Hadoop client).  One noted caveat is that you can't run Gangila for stats, but as I noted in the EMR forum you can use the MapR Control Center to the same effect.

Now I have no idea if I will stick with MapR, but the ease of switching between Hadoop distributions, AMIs, versions, etc with EMR is pretty slick.  It did take a some trial and error to figure out a working Hadoop/Hive/AMI version combo ... but nothing too crazy.

I know things like Apache WHIRR are making strides, but for now I am very happy with EMR.

Peace,
Tom


Thursday, December 20, 2012

using MapReduce to move files

One of the biggest issues in dealing with large volumes of data is moving it around.  Thankfully, we use Amazon's S3 as a store and process via their EMR.  Access inside their network is pretty snappy.

Still, I am moving a lot of data into our Hadoop clusters every morning.  This data needs to go through our master and be distributed on to all of the slave nodes (normally 7).  The following took a hair over 20 minutes.
hadoop fs -cp s3://eCollegeGradeData/activity/*.CSV /temp4/. 

12/12/20 17:12:50 INFO s3native.NativeS3FileSystem: Opening 's3://eCollegeGradeData/activity/Activity_KU_20120127.CSV' for reading
12/12/20 17:12:51 INFO s3native.NativeS3FileSystem: Opening 's3://eCollegeGradeData/activity/Activity_KU_20120128.CSV' for reading
12/12/20 17:12:51 INFO s3native.NativeS3FileSystem: Opening
...
12/12/20 17:32:22 INFO s3native.NativeS3FileSystem: Opening 's3://eCollegeGradeData/activity/Activity_KU_Historical_Extract_20120126_7.CSV' for reading

S3DistCp is an S3 enabled version of Apache's Hadoop Disctcp that uses MapReduce (well Map, at least) to move the files directly form S3 to the Hadoop slave nodes eliminating the master node bottleneck.

hadoop distcp s3n://eCollegeGradeData/activity/ /temp4/
12/12/20 17:41:49 INFO tools.DistCp: srcPaths=[s3n://eCollegeGradeData/activity]
12/12/20 17:41:49 INFO tools.DistCp: destPath=/temp4
12/12/20 17:41:50 INFO metrics.MetricsSaver: MetricsSaver DistCp root:hdfs:///mnt/var/lib/hadoop/metrics/ period:60 instanceId:i-b0bbf4ce jobflow:j-3IZ0J8F1W334P
12/12/20 17:41:50 INFO metrics.MetricsUtil: supported product mapr-m3
12/12/20 17:41:50 INFO metrics.MetricsSaver: Disable MetricsSaver due to MapR cluster
12/12/20 17:41:51 INFO fs.JobTrackerWatcher: Current running JobTracker is: ip-10-80-63-76.ec2.internal/10.80.63.76:9001
12/12/20 17:41:51 INFO tools.DistCp: sourcePathsCount=337
12/12/20 17:41:51 INFO tools.DistCp: filesToCopyCount=336
12/12/20 17:41:51 INFO tools.DistCp: bytesToCopyCount=13.0g
12/12/20 17:41:51 INFO fs.JobTrackerWatcher: Current running JobTracker is: ip-10-80-63-76.ec2.internal/10.80.63.76:9001
12/12/20 17:41:51 INFO fs.JobTrackerWatcher: Current running JobTracker is: ip-10-80-63-76.ec2.internal/10.80.63.76:9001
12/12/20 17:41:51 INFO mapred.JobClient: Running job: job_201212191754_0027
12/12/20 17:41:52 INFO mapred.JobClient:  map 0% reduce 0%
12/12/20 17:42:06 INFO mapred.JobClient:  map 2% reduce 0%
12/12/20 17:42:07 INFO mapred.JobClient:  map 5% reduce 0%
12/12/20 17:42:08 INFO mapred.JobClient:  map 8% reduce 0%
12/12/20 17:42:09 INFO mapred.JobClient:  map 9% reduce 0%
12/12/20 17:42:10 INFO mapred.JobClient:  map 14% reduce 0%
12/12/20 17:42:11 INFO mapred.JobClient:  map 16% reduce 0%
12/12/20 17:42:12 INFO mapred.JobClient:  map 29% reduce 0%
12/12/20 17:42:13 INFO mapred.JobClient:  map 31% reduce 0%
12/12/20 17:42:14 INFO mapred.JobClient:  map 39% reduce 0%
12/12/20 17:42:15 INFO mapred.JobClient:  map 41% reduce 0%
12/12/20 17:42:16 INFO mapred.JobClient:  map 43% reduce 0%
12/12/20 17:42:17 INFO mapred.JobClient:  map 45% reduce 0%
12/12/20 17:42:18 INFO mapred.JobClient:  map 47% reduce 0%
12/12/20 17:42:19 INFO mapred.JobClient:  map 50% reduce 0%
12/12/20 17:42:20 INFO mapred.JobClient:  map 51% reduce 0%
12/12/20 17:42:21 INFO mapred.JobClient:  map 53% reduce 0%
12/12/20 17:42:22 INFO mapred.JobClient:  map 54% reduce 0%
12/12/20 17:42:23 INFO mapred.JobClient:  map 56% reduce 0%
12/12/20 17:42:24 INFO mapred.JobClient:  map 57% reduce 0%
12/12/20 17:42:25 INFO mapred.JobClient:  map 58% reduce 0%
12/12/20 17:42:27 INFO mapred.JobClient:  map 60% reduce 0%
12/12/20 17:42:30 INFO mapred.JobClient:  map 65% reduce 0%
12/12/20 17:42:32 INFO mapred.JobClient:  map 66% reduce 0%
12/12/20 17:42:33 INFO mapred.JobClient:  map 69% reduce 0%
12/12/20 17:42:35 INFO mapred.JobClient:  map 70% reduce 0%
12/12/20 17:42:36 INFO mapred.JobClient:  map 71% reduce 0%
12/12/20 17:42:37 INFO mapred.JobClient:  map 73% reduce 0%
12/12/20 17:42:38 INFO mapred.JobClient:  map 74% reduce 0%
12/12/20 17:42:39 INFO mapred.JobClient:  map 75% reduce 0%
12/12/20 17:42:40 INFO mapred.JobClient:  map 76% reduce 0%
12/12/20 17:42:42 INFO mapred.JobClient:  map 77% reduce 0%
12/12/20 17:42:44 INFO mapred.JobClient:  map 78% reduce 0%
12/12/20 17:42:58 INFO mapred.JobClient:  map 79% reduce 0%
12/12/20 17:43:02 INFO mapred.JobClient:  map 83% reduce 0%
12/12/20 17:43:03 INFO mapred.JobClient:  map 86% reduce 0%
12/12/20 17:43:04 INFO mapred.JobClient:  map 87% reduce 0%
12/12/20 17:43:05 INFO mapred.JobClient:  map 88% reduce 0%
12/12/20 17:43:07 INFO mapred.JobClient:  map 90% reduce 0%
12/12/20 17:43:09 INFO mapred.JobClient:  map 92% reduce 0%
12/12/20 17:43:11 INFO mapred.JobClient:  map 93% reduce 0%
12/12/20 17:43:12 INFO mapred.JobClient:  map 94% reduce 0%
12/12/20 17:43:14 INFO mapred.JobClient:  map 97% reduce 0%
12/12/20 17:43:39 INFO mapred.JobClient:  map 98% reduce 0%
12/12/20 17:43:45 INFO mapred.JobClient:  map 99% reduce 0%
12/12/20 17:43:51 INFO mapred.JobClient:  map 100% reduce 0%
12/12/20 17:46:43 INFO mapred.JobClient: Job complete: job_201212191754_0027
12/12/20 17:46:43 INFO mapred.JobClient: Counters: 21
12/12/20 17:46:43 INFO mapred.JobClient:   Job Counters
12/12/20 17:46:43 INFO mapred.JobClient:     Aggregate execution time of mappers(ms)=1924883
12/12/20 17:46:43 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
12/12/20 17:46:43 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
12/12/20 17:46:43 INFO mapred.JobClient:     Launched map tasks=34
12/12/20 17:46:43 INFO mapred.JobClient:     Aggregate execution time of reducers(ms)=0
12/12/20 17:46:43 INFO mapred.JobClient:   FileSystemCounters
12/12/20 17:46:43 INFO mapred.JobClient:     MAPRFS_BYTES_READ=84770
12/12/20 17:46:43 INFO mapred.JobClient:     S3N_BYTES_READ=13971069631
12/12/20 17:46:43 INFO mapred.JobClient:     MAPRFS_BYTES_WRITTEN=13971069631
12/12/20 17:46:43 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1659360
12/12/20 17:46:43 INFO mapred.JobClient:   distcp
12/12/20 17:46:43 INFO mapred.JobClient:     Files copied=336
12/12/20 17:46:43 INFO mapred.JobClient:     Bytes copied=13971069631
12/12/20 17:46:43 INFO mapred.JobClient:     Bytes expected=13971069631
12/12/20 17:46:43 INFO mapred.JobClient:   Map-Reduce Framework
12/12/20 17:46:43 INFO mapred.JobClient:     Map input records=336
12/12/20 17:46:43 INFO mapred.JobClient:     PHYSICAL_MEMORY_BYTES=4710170624
12/12/20 17:46:43 INFO mapred.JobClient:     Spilled Records=0
12/12/20 17:46:43 INFO mapred.JobClient:     CPU_MILLISECONDS=530380
12/12/20 17:46:43 INFO mapred.JobClient:     VIRTUAL_MEMORY_BYTES=56869019648
12/12/20 17:46:43 INFO mapred.JobClient:     Map input bytes=50426
12/12/20 17:46:43 INFO mapred.JobClient:     Map output records=0
12/12/20 17:46:43 INFO mapred.JobClient:     SPLIT_RAW_BYTES=5100
12/12/20 17:46:43 INFO mapred.JobClient:     GC time elapsed (ms)=3356
12/12/20 17:46:43 INFO metrics.MetricsSaver: Inside MetricsSaver Shutdown Hook

Now, I should have gone looking for a better solution months ago ... but frankly this wasn't that huge of an issue.  I went from 20 minutes to 5 on this particular copy and have seen go even lower than that.  If you add it all up, I was able to reduce my execution time by half an hour.

Not too shabby!


Friday, November 16, 2012

Machine Learning - predicting outcomes from activity

So I have been meandering through Andrew Ng's (Stanford) excellent Machine Learning class at Coursera, which I highly recommend.  

My progress in the course has been slowed by the fact I keep running across things I can use, and now. I'm in the unique position to have an applicable data set as well as real world use cases.  Once I found Mahout (scalable machine learning libraries that run on top of Hadoop) I was in.

I'm far less interested in the ins and outs of the actual ML algorithms than what they can do for me.  So, once I had a basic understanding of Classification Systems I started hacking.  

In my earlier (and kludgier) efforts at ML I had to define success, and then measure current students against whatever benchmark I set (based on historical data).  This means lots of manual code, repeated interrogation of the data, and scaling issues.

ML algorithms can help with all of these issues.


• Train a model on a data set.
• Test on a subset (if good continue).
• Use the model with live data to make predictions.
• Leverage those predictions to improve outcomes.


I'm currently leaning on logistic regression, but there are other algorithms in the toolshed depending on your problem set.




Think of my data as a giant matrix (well at least the numeric fields), that gets plugged in to the above equation.  Each line in the matrix is a training example.  The "features" I use are "predictors" in Mahout parlance and the classifier the "target".  

So I took the following snapshot of data from CJ101 classes in 2012, of all student activity until day 14, and appended the outcomes (grade, success).  We call these our training examples (~4200 used here, ~200K in total for 2012).

username,class,term,section,pct,mins,subs,days,grade,success …
...
TheBeaver,CJ101,1201A,14,0.97,12860.97,235,40,A,1 
WardCleaver,CJ101,1202B,05,0.86,2930.28,90,20,F,0
JuneCleaver,CJ101,1202C,16,0.94,6432.99,124,28,C,1
EddieHaskel,CJ101,1201C,03,1.0,7257.34,96,40,A,1
WallyCleaver,CJ101,1202B,18,0.81,29736.2,462,54,A,1
LumpyRutheford,CJ101,1201A,11,0.78,11685.95,308,48,C,1
WhiteyWitney,CJ101,1201A,03,0.96,37276.78,415,63,A,1
LarryMondello,CJ101,1202C,10,0.73,7223.47,150,22,F,0
MissLanders,CJ101,1204A,04,0.98,13717.4,292,57,B,1 
.. 

Here I set "success" to 1 for good student outcomes (A-C) and 0 for bad ones (D and below).  My goal is to train a model that uses my numeric features (pct, mins, days, subs) to predict the "success" value.

mahout trainlogistic --input CJ101_data.csv --output model --target success --categories 2 --predictors pct mins days subs  --types numeric --output ./model 

success ~ 0.001*Intercept Term + 0.009*days + -0.001*mins + 0.000*pct + 0.022*subs       
Intercept Term 0.00058                
days 0.00878                
mins -0.00119                 
pct 0.00041                
subs 0.02231

So the Mahout picks the weighting for my features which are used in the logistic regression, basically telling me how important each on of these terms are.  Looking at this particular class ... days and subs are much more important that mins or pct.

Now I need to test how good I was at predicting the success rate based on time, subs, days, pct.

mahout runlogistic –input CJ101_data.csv --model model  --auc –confusion 

AUC = 0.74
confusion: [[1594.0, 163.0], [16.0, 2.0]]
entropy: [[-0.0, -0.0], [-14.9, -2.6]]
12/11/14 15:32:14 INFO driver.MahoutDriver: Program took 844 ms 
* AUC is the success of our prediction

I can actually fiddle around and get this much higher (while making the predictions more usable as well), but let's save that for another posts.  The long and short is I can predict the outcomes of CJ101 students just 14 days into a 70 day class with 74% accuracy.

This information can not only identify struggling students very early into a class, but be leveraged in automated interventions.

Not too shabby at all!






Wednesday, October 3, 2012

Thursday, September 27, 2012

Data Week - an exercise in humility

So I just spent a few days at Data Week in San Francisco, and was it ever eye opening.

I've always claimed that I have more complexity issues, tying together large data sets, than actual Big Data problems.  I run a smallish 8 node ~50GB EMR cluster at Amazon, processing a few hours daily.  My issues are generally around throughput, getting data in and out of my cluster in a reasonable time (say < 2hrs).

Then I watch the BigQuery guys from Google doing a full table scan on a 500TB table in a minute.  Ouch.

In session after session numbers in the 100s of Terabytes or even Petabytes were being geek dropped, dwarfing anything I've seen here in our local Chicago Hadoop/BigData/Machine Learning groups.  There were interesting discussions just about moving that volume of information around.

What I liked most what the interest in extracting value (information) from data, not necessarily in creating the biggest pile.  I think the R folks (from Revolution Analytics) were spot on here.  I need to spend some time there.

It was also nice to see the lack of BI tool vendors pitching how their wares could "integrate" in some lame ass way to a Hadoop cluster (JDBC anyone???).  This sometimes infects Big Data gatherings and is insufferable.  So happy to avoid it.

A friend in sales recently asked ...

What Do BI Vendors Mean When They Say They Integrate With Hadoop?

My reply was "Bullshit. About 1/3 of what they tell you."





Wednesday, August 22, 2012

trending topics in Hive

<WARNING> I normally try to keep the Big Data discussions in this blog accessible to non-geeks, this is anything but.  There is a lot of nasty HQL, terminology, etc.  I promise the next post will get back on track. </WARNING>

So I recently unearthed a bunch of discussions and stuck them in my Hive cluster (I'm adding about 4K a day).  I loved the n-gram examples in the  Hadoop the Definitive Guide (which I loaned out and was never returned!) and in the Amazon EMR docs.  Now I had a nice data set to work with!

Unfortunately all the hive examples I've found either leveraged a publicly available n-gram set, or didn't really do anything with it.  Getting the n-grams from my hive table is easy enough with a lifted hive command and my table structure (table:discussions, column:description):
hive> SELECT explode(context_ngrams(sentences(lower(description)), array(null), 25)) AS word_map FROM discussions;
But unfortunately the results come back in a hideously ugly structure and full of words that don't tell us much.  I had some work to do ...
{"ngram":["the"],"estfrequency":82998.0}
{"ngram":["to"],"estfrequency":53626.0}
{"ngram":["and"],"estfrequency":51487.0}
{"ngram":["of"],"estfrequency":41579.0}
{"ngram":["a"],"estfrequency":37644.0}
{"ngram":["in"],"estfrequency":29572.0}
{"ngram":["i"],"estfrequency":27989.0}
{"ngram":["is"],"estfrequency":26268.0}
{"ngram":["that"],"estfrequency":23225.0}
{"ngram":["for"],"estfrequency":17557.0}
{"ngram":["be"],"estfrequency":14100.0}
{"ngram":["are"],"estfrequency":12911.0}
{"ngram":["with"],"estfrequency":12660.0}
{"ngram":["it"],"estfrequency":12618.0}
{"ngram":["this"],"estfrequency":12091.0}
{"ngram":["as"],"estfrequency":12023.0}
{"ngram":["have"],"estfrequency":11179.0}
{"ngram":["my"],"estfrequency":10446.0}
{"ngram":["or"],"estfrequency":10011.0}
{"ngram":["on"],"estfrequency":9602.0}
{"ngram":["you"],"estfrequency":9460.0}
{"ngram":["not"],"estfrequency":8521.0}
{"ngram":["they"],"estfrequency":7306.0}
{"ngram":["would"],"estfrequency":7093.0}
{"ngram":["can"],"estfrequency":7087.0}
It was clear I needed some sort of whitelist to find anything interesting (83K "the's" don't tell me much), so I borrowed this one put it into a new Hive table.

hadoop fs -mkdir /temp8/
hadoop fs -cp s3://XXX/stopwords.txt /temp8/.
hive -e  'drop table stop_words '
hive -e  'create table stop_words (word string) row format delimited fields terminated by "\t";
load data inpath "/temp8/" overwrite into table stop_words;'
Now I needed to get the data into a structure I could use.  After playing around, I was able to get the returned struct in a Hive table.  This was important because there are a lot of limitations on what you can do with Hive UDTF's (explode, etc), plus it takes a minute or so to run the n-grams query.  Trust me, it sucks.

hive -e  'drop table trending_words_struct;'
hive -e  'create table trending_words_struct (NEW_ITEM ARRAY<STRUCT<ngram:array<string>, estfrequency:double>>);
INSERT OVERWRITE TABLE trending_words_struct
SELECT context_ngrams(sentences(lower(description)), array(null), 1000) as word_map FROM discussions;'


Next, I can unlock this mess of a structure and put it into something easier to query.

hive -e  'drop table trending_words;'
hive -e  'create table trending_words (ngram string, estfrequency double);'
hive -e 'INSERT OVERWRITE TABLE trending_words select X.ngram[0], X.estfrequency  from (select explode(new_item) as X from trending_words_struct) Z;';

Finally I can match against the whitelist:
select tw.ngram, tw.estfrequency from trending_words tw LEFT OUTER JOIN stop_words sw ON (tw.ngram = sw.word) WHERE sw.word is NULL order by tw.estfrequency DESC;
 ... And get a list of non-trival words from my discussions!  Granted, I need to add stuff like can/will/etc to my whitelist, change to daily inspection, etc.  But you get the idea.

can 5096.0
will 4145.0
one 3018.0
also 2953.0
may 2397.0
children 2187.0
think 2088.0
people 2025.0
time 2007.0
use 1942.0
help 1912.0
behavior 1838.0
information 1755.0
research 1748.0
like 1673.0
work 1654.0
many 1650.0
http 1590.0
child 1552.0
make 1542.0
health 1538.0
2010 1518.0
well 1482.0
new 1481.0
used 1480.0
good 1439.0
different 1432.0
2012 1410.0
The key technical breakthroughs were:
#1 Deciding to just dump the results in their nasty structure into a Hive table
ARRAY<STRUCT<ngram:array<string>, estfrequency:double>> 
#2 Figuring out how to get those results into a usable table (that Z was a killer!)
select X.ngram[0], X.estfrequency from (select explode(new_item) as X from trending_words_struct) Z;
#3 getting the outer JOIN right
select tw.ngram, tw.estfrequency from trending_words tw LEFT OUTER JOIN stop_words sw ON (tw.ngram = sw.word) WHERE sw.word is NULL order by tw.estfrequency DESC; 

Sadly, there was a lot of trial and error.  Dead ends were frequent, a Hive bug was encountered, etc.  It took me days to get this figured out, and I wanted to document the process in hopes others benefit from my mistakes.

The cool thing is this treatment will work for just about anything.  Have at it!


Wednesday, August 8, 2012

the death of BI

Business Intelligence (BI) processes have long been tied to Data Warehouses (DW).
  1. collect a bunch of data
  2. query it for insight
  3. hypothesize
  4. validate 
  5. test in the wild
  6. goto #1
Where the BI processes fail is in speed.  This six step process takes months in most environments, and requires lots of human intervention.  I'm not kidding.  Let's try to real world this for a minute.  

So say a business partner (BP) says that they have an idea that students who read the syllabus do better in a class.  They come to the BI team who tries to see if there is data in the warehouse on syllabus reading.  If not they have to get it added, which is invariably a giant pain in the ass.  Next they need to map syllabus reading to outcomes (grades, completion, etc).  More pain and suffering.  Models are made, opinions formed, etc.

Now we have to do something about it.  

Say we make it a requirement or slap the student in the face with syllabus usage stats.  Then a deck is made with the findings supporting the thesis and business case for acting.  The deck is sold and prioritized in the development queue.  This now needs to be pushed live (in some capacity) and then tested and whatnot.  

The problem is the students who could have been helped by the process are gone, dropped out, failed, etc.  We "might" be able to help the next set of students, but not those who are already gone.  

Big Data solutions give us a near real time feedback loop by opening up access to analysis.

If we  are already collecting and saving everything, there is no need to "get stuff added to the DW".  Win #1.  Since data is stored in an unstructured format, there is no need to do any transformations either.  Win #2.  I might need to bust out some SQL-fu, but that is all.  I have everything I need at my disposal.  Now here is the kicker, that giant bastion of knowledge is not locked up on some deep storage DW with limited access (aka one way in, one way out).  It is in a live store I have query access to and the ability to process and automate.  Win #3.

So now,  I can put the onus on the end user and have them do the work for me.  Simply query the store for statistics on syllabus reading (for a particular class, term, etc) and present it directly to the student.  

"Students who read this IT193 syllabus <LINK> on average earn a .56 grade higher".

The feats of strength required to get this sort of thing done in a traditional BI/DW environment are Herculean.  They are not made for real time processing and presentation. They are made for spitting out reports and safeguarding data.

Now I'm not suggesting people go burn down the DWs and fire the BI folks.  This will be a gradual process over time, where Big Data solutions automate away the need for BI.