Monday, December 16, 2013

Hive vs Impala @ EMR

A colleague recently tipped me off that Impala arrived at Amazon's Elastic MapReduce (EMR).  I've been waiting for some time for this functionality, so I got started right away with a little benchmark.

Launching impala @ EMR is a breeze, just download the latest ruby client and replace "hive-interactive" with "impala-interative" in your EMR creation script.  I also followed the instructions to use the most recent AMI.

/home/ec2-user/emr2/elastic-mapreduce --create --alive --impala-interactive --name "Dev Impala - 1 hs1.8xlarge, 3 m2.4xlarge"  --master-instance-type "hs1.8xlarge" --slave-instance-type "m2.4xlarge" --num-instances 4 --ami-version 3.0.2 --region us-east-1 | awk -F' ' '{print $4}' > impala_master

Here is my benchmarking code ....

### impala test ###
impala_start=$(date +"%s")

hadoop fs -mkdir /data/jobs/
hadoop distcp s3n://XXXXXXXXXX/csvfeed/jobs/ /data/
hive -e 'drop table jobs'
hive -e 'create external table jobs (id string, title string, post_date string, job_board_url string, company string, city string, state string, description string, industry string, price string, update_date string, feed_date string, job_board string, hash string) row format delimited fields terminated by "\t" LOCATION "/data/jobs/";'
impala-shell -r --query="select count(1) from jobs";
impala_end=$(date +"%s")


### hive test ###
hive_start=$(date +"%s")

hadoop fs -mkdir /temp20/
hadoop distcp s3n://XXXXXXXXXX/csvfeed/jobs/ /temp20/
hive -e 'drop table jobs'
hive -e 'create table jobs (id string, title string, post_date string, job_board_url string, company string, city string, state string, description string, in
dustry string, price string, update_date string, feed_date string, job_board string, hash string) row format delimited fields terminated by "\t";
load data inpath "/temp20/jobs/" overwrite into table jobs;'
hive -e 'select count(1) from jobs';
hive_end=$(date +"%s")


diff=$(($impala_end-$impala_start))
echo "Impala - $(($diff / 60)) minutes and $(($diff % 60)) seconds elapsed."

diff=$(($hive_end-$hive_start))

echo "Hive - $(($diff / 60)) minutes and $(($diff % 60)) seconds elapsed."


And here is my output .... Not too shabby.

I loaded about 130GB of data, and then counted the 1387230376 rows.  Now some of the performance gains are likely due to the use of an externally managed Hive table (needed so Impala can see the Hive tables), but the actual query is darn near instantaneous in Impala.


Impala - 8 minutes and 50 seconds elapsed.
Hive - 20 minutes and 2 seconds elapsed.


Of course, Impala and Hive are great for different use cases and leaving Hive's internally managed cocoon is not for every situation.  I'm still debating the ramifications of this on our product, but it is going to be hard to argue with the performance gains.

Hope this was useful!



Thursday, November 21, 2013

tf-idf in hive, with lessons

Tf-idf is a great way to measure relevancy.  

In my recent work, I am concerned with surfacing skills from job descriptions.  First, I n-gram all the JDs (1.25B rows).  Next, I match them against a know list of skills (10K or so).  Finally, I count them up and created a tf-idf relevancy score.  

It took a few tries to get this going at scale in Hive.  I was running into an issue where I'd get all the way to 100% mapped, and then get stuck at 70% reduced.  A candidate I was interviewing let me know it was likely running out of disk space while writing the result set (the EMR logs weren't very helpful).  

Being a lazy developer, I tried larger node footprints (and even partitions) but was still getting nowhere fast.

So I decided to split up my hive query into a number of steps.  The seemingly innocuous joining to the whole jobs table for a simple count (30M rows) and the creating of a temp table with everything but that before doing the tf-idf calc were the keys to scaling this.

I am now processing the following every morning complete with a tf-idf score.  I hope this is helpful!

jobs: 45,694,724
active jobs: 1,619,206
----------
1w active job/skills matches: 22,932,796 avg: 14.49
2w active job/skills matches: 8,806,166 avg: 6.62
3w active job/skills matches: 506,809 avg: 1.28

total job ngrams reviewed: 1,272,560,565
----------
total 1w ngrams reviewed: 297,779,164
total 2w ngrams reviewed: 475,109,902
total 3w ngrams reviewed: 499,671,499


# get the number of jobs
hive -e 'drop table job_count;'
hive -e 'create table job_count(cnt int);
INSERT OVERWRITE TABLE job_count
select count(distinct hash) as cnt FROM todays_jobs';

# coalesce by 1/2/3w n-gram matches
hive -e 'drop table all_job_skills_match;'
hive -e 'create table all_job_skills_match (hash string, company string, city string, title string, lay_skill string, ngram array<string>,  estfrequency double);
INSERT OVERWRITE TABLE all_job_skills_match

SELECT * from (

SELECT * from (
select * from 1w_job_skills_match UNION ALL
select * from 2w_job_skills_match
) XXX

UNION ALL

select * from 3w_job_skills_match) YYY

;'

# count how many job descriptions each skill shows up in
hive -e 'drop table job_skills_count'
hive -e 'create table job_skills_count(ngram array<string>, count int);
INSERT OVERWRITE TABLE job_skills_count
select ngram, count(ajsm.hash) from all_job_skills_match ajsm group by ngram;'

# create a table with all job stats, but NOT joined to the total number of jobs
hive -e 'drop table todays_job_stats';
hive -e 'create table todays_job_stats (hash string, company string, city string, title string, lay_skill string, ngram array<string>, estfrequency int, job_matches int);
INSERT OVERWRITE TABLE todays_job_stats
select jsm.hash, jsm.company, jsm.city, jsm.title, jsm.lay_skill, jsm.ngram, jsm.estfrequency, jsc.count
FROM job_skills_count jsc 
JOIN all_job_skills_match jsm
ON (jsc.ngram = jsm.ngram)
GROUP BY jsm.hash, jsm.company, jsm.city, jsm.title, jsm.lay_skill, jsm.ngram, jsm.estfrequency, jsc.count '

# finally run the tf-idf calc by joining to total number of jobs
hive -e 'select hash, company, city, title, lay_skill, ngram, estfrequency, round((estfrequency * log10(cnt/job_matches)),2) from todays_job_stats JOIN j
ob_count' > output.txt

Friday, August 30, 2013

Real Time Intelligence talk

I gave the a talk last week on moving from Analytics to Action at the Gateway Analytics Network in Chicago.  It was great fun, and the audience was widely varied ... from product managers, to analytics folks, and data scientists.  I got some excellent feedback, and was able to take in some great talks as well.  I'm particularly excited about what Beyond Core is up to.

My presentation deck is linked here.

I'll be rehashing my talk in November at IIT's Stuart School of Business Friday Research Presentation Series if you would like to come and heckle.

Tuesday, August 13, 2013

Scaling Hive Row Specific N-Grams

I've been doing a lot of NLP (Natural Language Processing) work in Hive of late, and earlier posted about the excellent n-gram utilities available out of the box.

That said, there are some scaling issues.

I was running into problems with this code, it ran on a few hundred rows but I was getting heap overflow errors in Hive when I ran it on more than 1000.  Now, you could up the heap size, but that isn't really addressing the problem.  Plus, I needed it to run on over 100K rows.

# this is fine, pulls in daily jobs, 85K or so for 08/01
hive -e 'drop table todays_jobs;'
hive -e 'create table todays_jobs(id string, title string, description string);
INSERT OVERWRITE TABLE todays_jobs
SELECT id, title, description from jobs
WHERE substring(post_date,2,10)="2013-08-01"
GROUP BY id, title, description;' 
## this works on 1000 jd's ... pukes on 10000 ##
hive -e 'drop table job_skills;'
hive -e 'create table job_skills(id string, title string, NEW_ITEM ARRAY<STRUCT<ngram:array<string>, estfrequency:double>>);
INSERT OVERWRITE TABLE job_skills
SELECT id, title, context_ngrams(sentences(lower(description)), array(null), 100) as word_map
FROM todays_jobs
GROUP BY by id, title;'

I found the Hive documentation lacking, so as a public service am posting my code with some comments.  Notice that I added a fourth item in the select clause (substring(title,0,2)), even though I only declare three fields while creating the table.  This is where the partition value goes.  Normally a date would be great to use here ... but in my case they were all the same so I had to use look elsewhere.
# I create partitions based on the first two characters of the title
# this gave me 683 partitions for 08/01 ... which slowed down the MapReduce process
# but allowed me to complete the ngram processing in the next step 
hive -e 'create table todays_jobs(id string, description string, title string) PARTITIONED BY (part string);
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;
INSERT OVERWRITE TABLE todays_jobs PARTITION(part)
SELECT id, description, title, substring(title,0,2) from jobs
WHERE substring(post_date,2,10)="2013-08-01"
GROUP BY id, description, title, substring(title,0,2)
;'
# now works on 80K+ rows ... extracting 100 1 word ngrams per row
hive -e 'drop table ngram_skills;'
hive -e 'create table ngram_skills(id string, title string, NEW_ITEM ARRAY<STRUCT<ngram:array<string>, estfrequency:double>>);
INSERT OVERWRITE TABLE ngram_skills
SELECT id, title, context_ngrams(sentences(lower(description)), array(null), 100) as word_map
FROM todays_jobs
group by id, title;'
What I end up here is a a table with jobs details (id and title) and an n-gram of the description.  This makes for very easy matching (once I explode the ngram which is pretty damn cryptic).

hive -e  'drop table trending_words;'
hive -e  'create table trending_words (id string, title string, ngram array<string>,  estfrequency double);
INSERT OVERWRITE TABLE trending_words
SELECT id, title, X.ngram, X.estfrequency from ngram_skills LATERAL VIEW explode(new_item) Z as X;' 
hive -e 'select tw.id, tw.title, tw.ngram, tw.estfrequency FROM trending_words tw JOIN 1w_taxonomy tax ON (tw.ngram[0] = lower(tax.title)) where tax.title<>"title" order by tw.id, tw.estfrequency DESC limit 1000;'
 And I get output like this!!!!
439235503 "Director of Marketing Kindred Central Dakotas" ["marketing"] 9.0
439235503 "Director of Marketing Kindred Central Dakotas" ["sales"] 5.0
439235503 "Director of Marketing Kindred Central Dakotas" ["leadership"] 3.0
439235503 "Director of Marketing Kindred Central Dakotas" ["training"] 2.0 
439235505 "Certified Nursing Assistant I / Full Time" ["supervision"] 1.0
439235505 "Certified Nursing Assistant I / Full Time" ["collaboration"] 1.0 
439235506 "CNA - Evenings, Full Time Part Time - Wareham, MA" ["collaboration"] 1.0
439235506 "CNA - Evenings, Full Time Part Time - Wareham, MA" ["supervision"] 1.0
439235507 "CNA - Full Time Part Time, Days - Wareham, MA" ["collaboration"] 1.0
439235507 "CNA - Full Time Part Time, Days - Wareham, MA" ["supervision"] 1.0


Friday, July 5, 2013

variety over volume

When embarking on a Big Data effort, it almost always pays go for variety first.  While volume and velocity may be sexy, variety often makes data sets usable.


Now you can do all sorts of Machine Learning / Math to go and prove whether you should be going wide (adding variety) or deep (adding volume), but I'd like to offer a much more practical example.

Earlier I've written about our "Voice of the Customer" Pilot.  We are basically polling all our students who call in (or who we call).  We've had a tremendous response, but selling the data internally has been a bit of a slog because we tied it all together manually in a cumbersome and irregular process.

So I recently took the time to add the survey results to our Hadoop Cluster.

Somehow I convinced Wufoo (where our surveys live) to give me an integration key (a giant PITA), then leveraged the excellent WuParty ruby gem to download the survey results, which I drop in a tab delimited file and ship to S3 (where my EMR cluster picks them up for processing).

This is like 100K of data a day.  Certainty not big.

But with this added data source (aka additional variety) I am able to tie the actual calls to the survey results.  I throw this together in a delimited file which our BI team picks up from S3 and leverages to build reports for the CC managers and departments.


This is damn near real time feedback on our call centers, and it is accessible.
hive -e 'select fb.username, fb.email, fb.interaction_id, fb.department, call.department, call.resource_name, fb.satisfied, fb.ease, fb.recommend, fb.comments, fb.created_date, call.l1_name, call.l2_name, call.l3_name, call.l4_name from feedback fb JOIN calls call ON (fb.interaction_id=call.interaction_id) WHERE resource_name RLIKE "^[A-Za-z]" AND call.l1_name IS NOT NULL and call.l1_name <> "" GROUP BY fb.username, fb.email, fb.interaction_id, fb.department, call.resource_name, call.department, fb.satisfied, fb.ease, fb.recommend, fb.comments, fb.created_date, call.l1_name, call.l2_name, call.l3_name, call.l4_name;' > agent_feedback.csv
So what I've done here, is add almost no volume to our cluster ... but deliver a ton of value with variety.

And the survey results are not a one trick pony, now that I have this data coming into my cluster I can leverage it new and unthought of ways.  How about looking at student outcomes and CSAT scores?  Can falling CSAT predict bad outcomes?  How about the opposite?

Good questions start to arise only when we have the data to ask them.









Wednesday, April 3, 2013

Act before you think?


If someone asks me what Big Data buys you, I almost always reply with "speed".

Big Data solutions allow us to act rather than think.  As an innovation guy and a follower of Lean Startup model, I am all about speed.  Fail fast forward, learn you way, etc.  This is the polar opposite of most BI and business processes.  I'm not interested in models that explain what happened after the fact, but in tools that allow me to shape outcomes.

I was recently approached by a business partner, looking to solicit feedback on our call centers.

As chance would have it, we were in the final stages of getting our call center data (Genesys) into our data store (S3) and Hadoop cluster (EMR).  As is normally the case, we weren't really sure what we'd do with it ... just that it would be interesting to have.

Anyhow, after a bit of fiddling we were able to tie the call logs (collected every 15 minutes) to our student records and had an avenue for polling our students.  Our business partners were thrilled.  They had our call center team working on popping voice call backs, but the project was flailing.  Even more, we were skeptical that we'd get good interactivity by 1) asking for call back permission and then 2) calling students back with a phone survey.

So we got going, literally just putting together a daily csv that we sent to someone to manually email links out.  Next we automated the pushing of the "survey list" to our outbound mail provider.  Feedback was coming back, in droves.  On top of emailing, we started pushing the survey links directly to the activity streams of students (on our Social Portal).  It turns out these students were over 2x as likely to complete surveys.  Finally we started survey students who we had called (outbound calls), "did we call at a good time", etc.

The voice callback survey project I mentioned earlier is still not live, frankly it is already irrlevant.

Please note, we did absolutely no analysis before we stated polling surveys.  We made no predictions, and had no expectations.  At the last minute, almost on a lark, our business partner suggested a free form text feedback block.  So much good feedback started coming back we had to enlist a "fixer" to deal with student issues.

We are literally "saving" students who were ready to throw in the towel, call the BBB, etc.

All because we didn't wait.  Having put the data together in an operation-able format, we were able to get out the door in a few days and then iterate and improve.  Next we are going to pull in the survey results and use them to create a real time success dashboard.  Which departments, teams, advisors are doing well?

We may need to revisit the old adage, "think before you act".

Thursday, February 7, 2013

amassing data - beg, borrow, steal

In an ideal world, all the data you need would be readily accessible.

Unfortunately, we don't live in that world. Data is often hidden from view, buried, and hoarded.  Sometimes on purpose, and sometimes for perfectly valid reasons.

The role of a Big Data technologist more American Picker than archeologist.  Be honest with yourself, you probably don't know exactly what looking for nor do you have time to figure it out.  You'll know something cool when you see it.  Make some reasonable assumptions of what you'd like to have and get after it.

You can store 1TB per year for around $600 at Amazon's S3, 10% of that if you put it in their long term storage (Glacier).  Consider storage costs as damn near free, don't be afraid to start piling stuff up.

The following is my guide to getting data en masse.


Beg (good)
  • Ask for it.  
  • Make this as painless as possible on all parties (go direct). 
  • Be thankful and courteous, give props where they are do.  

Borrow (better)
  • Repurpose others' data.
  • Don't wait for any changes, additions, formatting, etc.  DIY.
  • Pulling data now is much better than waiting for someone to deliver it.  Get going.

Steal (best)
  • Find out where data lives and go fishing.  Poke around.  
  • Better to plead forgiveness than ask permission.
  • If you have access, you probably aren't breaking any rules.

Wash, rinse, repeat.

At any given time you'll probably be doing all of these simultaneously.  My only caveat is "don't store junk".  I remove non-Ascii characters and validate that files are complete.  But don't do much more than that.

Reserve the right to rethink any decisions on what to keep by not deleting anything.

Once you get used to working like this, you will be amazed at how much you can acquire relatively painlessly.  Some things will take hours, and some unfortunately years.  Some things you'll need ASAP, and some you'll have no idea what to do with (I started collecting some activity data 7 months ago that I never used until yesterday).  Who knows?

Take what you can get now, and iterate.  Good things will follow.









Monday, January 7, 2013

Why I switched to MapR and love EMR ...

A few months back I was approached by a colleague interested in using Hadoop/Hive to power our Tableau clients.  We played around for a bit and learned, the hard way, they the only way were were going to get this to go easily was switching to a MapR distribution.

Fortunately, EMR (Elastic MapReduce) makes this a snap.

./elastic-mapreduce --create --alive --hive-interactive --name "Hive Dev Flow" --instance-t
ype m1.large --num-instances 8 --hadoop-version 1.0.3 --hive-versions 0.8.1.6 --ami-version 2.3.0 --with-supported-products mapr-m3 

Literally, you just need to add the parameter (--with-supported-products mapr-m3).

Next you need to open up 8453 (and possibly some others) on your Hadoop master at EC2 (in Security Groups), and then run the MapR ODBC connector tool.  Now you are ready to rock (with any ODBC Hadoop client).  One noted caveat is that you can't run Gangila for stats, but as I noted in the EMR forum you can use the MapR Control Center to the same effect.

Now I have no idea if I will stick with MapR, but the ease of switching between Hadoop distributions, AMIs, versions, etc with EMR is pretty slick.  It did take a some trial and error to figure out a working Hadoop/Hive/AMI version combo ... but nothing too crazy.

I know things like Apache WHIRR are making strides, but for now I am very happy with EMR.

Peace,
Tom