Wednesday, April 2, 2014

a workable Neo4J config @ AWS

A recent problem set has just been screaming for a graph db (large number of nodes, massive number of edges, shortest path traversal).

After a bit of research we decided that it was best to go with Neo4J over those running on Hadoop (Girpah, etc), at least to get started.Following was our rationale:
  • No one on our team has any graph db experience.
  • The Neo4J community is way more active that it's Hadoop based counterparts.
  • We already run a petty heterogeneous stack (R Studio/Server, Hive, Impala, Python).
  • The ability to show the graph off visually us somewhat important.
  • Our graph db won't see frequent updates, and usage will be minimal.
  • Hydrating the graph from our MYSQL db via python was pretty trivial.
  • Neo4J offers very easy REST API access.

So I went about getting Neo4J to go at AWS, here is a blow by blow of how I got it done.  

Don't follow the main Neo4J EC2 instructions (utilizing Cloud Formation), it was hosed trying to locate the AMI.  Go manual, you will thank me.


Notes / extra steps:
  • Neo4J will be installed in /var/lib/neo4j/
  • Neo4J start script is /etc/init.d/neo4j-service (stop/start/restart) ... this incorrectly noted in docs
  • Edit /etc/security/limits.conf and add these two lines:
neo4j   soft    no file  40000
neo4j   hard    nofile  40000
  • Neo4J is only accessible locally by default:
Edit /var/lib/neo4j/conf/neo4j-server.properties
uncomment this line
org.neo4j.server.webserver.address=0.0.0.0
  • Another thing
Edit /etc/pam.d/su
uncomment or add the following line
session    required   pam_limits.so
That should get you up and running, and accessible.  Here is a picture of a graph we created based on wikipedia categories and pages centered on "Machine Learning".  We are just getting our feet wet, but love what we are seeing!










Monday, December 16, 2013

Hive vs Impala @ EMR

A colleague recently tipped me off that Impala arrived at Amazon's Elastic MapReduce (EMR).  I've been waiting for some time for this functionality, so I got started right away with a little benchmark.

Launching impala @ EMR is a breeze, just download the latest ruby client and replace "hive-interactive" with "impala-interative" in your EMR creation script.  I also followed the instructions to use the most recent AMI.

/home/ec2-user/emr2/elastic-mapreduce --create --alive --impala-interactive --name "Dev Impala - 1 hs1.8xlarge, 3 m2.4xlarge"  --master-instance-type "hs1.8xlarge" --slave-instance-type "m2.4xlarge" --num-instances 4 --ami-version 3.0.2 --region us-east-1 | awk -F' ' '{print $4}' > impala_master

Here is my benchmarking code ....

### impala test ###
impala_start=$(date +"%s")

hadoop fs -mkdir /data/jobs/
hadoop distcp s3n://XXXXXXXXXX/csvfeed/jobs/ /data/
hive -e 'drop table jobs'
hive -e 'create external table jobs (id string, title string, post_date string, job_board_url string, company string, city string, state string, description string, industry string, price string, update_date string, feed_date string, job_board string, hash string) row format delimited fields terminated by "\t" LOCATION "/data/jobs/";'
impala-shell -r --query="select count(1) from jobs";
impala_end=$(date +"%s")


### hive test ###
hive_start=$(date +"%s")

hadoop fs -mkdir /temp20/
hadoop distcp s3n://XXXXXXXXXX/csvfeed/jobs/ /temp20/
hive -e 'drop table jobs'
hive -e 'create table jobs (id string, title string, post_date string, job_board_url string, company string, city string, state string, description string, in
dustry string, price string, update_date string, feed_date string, job_board string, hash string) row format delimited fields terminated by "\t";
load data inpath "/temp20/jobs/" overwrite into table jobs;'
hive -e 'select count(1) from jobs';
hive_end=$(date +"%s")


diff=$(($impala_end-$impala_start))
echo "Impala - $(($diff / 60)) minutes and $(($diff % 60)) seconds elapsed."

diff=$(($hive_end-$hive_start))

echo "Hive - $(($diff / 60)) minutes and $(($diff % 60)) seconds elapsed."


And here is my output .... Not too shabby.

I loaded about 130GB of data, and then counted the 1387230376 rows.  Now some of the performance gains are likely due to the use of an externally managed Hive table (needed so Impala can see the Hive tables), but the actual query is darn near instantaneous in Impala.


Impala - 8 minutes and 50 seconds elapsed.
Hive - 20 minutes and 2 seconds elapsed.


Of course, Impala and Hive are great for different use cases and leaving Hive's internally managed cocoon is not for every situation.  I'm still debating the ramifications of this on our product, but it is going to be hard to argue with the performance gains.

Hope this was useful!



Thursday, November 21, 2013

tf-idf in hive, with lessons

Tf-idf is a great way to measure relevancy.  

In my recent work, I am concerned with surfacing skills from job descriptions.  First, I n-gram all the JDs (1.25B rows).  Next, I match them against a know list of skills (10K or so).  Finally, I count them up and created a tf-idf relevancy score.  

It took a few tries to get this going at scale in Hive.  I was running into an issue where I'd get all the way to 100% mapped, and then get stuck at 70% reduced.  A candidate I was interviewing let me know it was likely running out of disk space while writing the result set (the EMR logs weren't very helpful).  

Being a lazy developer, I tried larger node footprints (and even partitions) but was still getting nowhere fast.

So I decided to split up my hive query into a number of steps.  The seemingly innocuous joining to the whole jobs table for a simple count (30M rows) and the creating of a temp table with everything but that before doing the tf-idf calc were the keys to scaling this.

I am now processing the following every morning complete with a tf-idf score.  I hope this is helpful!

jobs: 45,694,724
active jobs: 1,619,206
----------
1w active job/skills matches: 22,932,796 avg: 14.49
2w active job/skills matches: 8,806,166 avg: 6.62
3w active job/skills matches: 506,809 avg: 1.28

total job ngrams reviewed: 1,272,560,565
----------
total 1w ngrams reviewed: 297,779,164
total 2w ngrams reviewed: 475,109,902
total 3w ngrams reviewed: 499,671,499


# get the number of jobs
hive -e 'drop table job_count;'
hive -e 'create table job_count(cnt int);
INSERT OVERWRITE TABLE job_count
select count(distinct hash) as cnt FROM todays_jobs';

# coalesce by 1/2/3w n-gram matches
hive -e 'drop table all_job_skills_match;'
hive -e 'create table all_job_skills_match (hash string, company string, city string, title string, lay_skill string, ngram array<string>,  estfrequency double);
INSERT OVERWRITE TABLE all_job_skills_match

SELECT * from (

SELECT * from (
select * from 1w_job_skills_match UNION ALL
select * from 2w_job_skills_match
) XXX

UNION ALL

select * from 3w_job_skills_match) YYY

;'

# count how many job descriptions each skill shows up in
hive -e 'drop table job_skills_count'
hive -e 'create table job_skills_count(ngram array<string>, count int);
INSERT OVERWRITE TABLE job_skills_count
select ngram, count(ajsm.hash) from all_job_skills_match ajsm group by ngram;'

# create a table with all job stats, but NOT joined to the total number of jobs
hive -e 'drop table todays_job_stats';
hive -e 'create table todays_job_stats (hash string, company string, city string, title string, lay_skill string, ngram array<string>, estfrequency int, job_matches int);
INSERT OVERWRITE TABLE todays_job_stats
select jsm.hash, jsm.company, jsm.city, jsm.title, jsm.lay_skill, jsm.ngram, jsm.estfrequency, jsc.count
FROM job_skills_count jsc 
JOIN all_job_skills_match jsm
ON (jsc.ngram = jsm.ngram)
GROUP BY jsm.hash, jsm.company, jsm.city, jsm.title, jsm.lay_skill, jsm.ngram, jsm.estfrequency, jsc.count '

# finally run the tf-idf calc by joining to total number of jobs
hive -e 'select hash, company, city, title, lay_skill, ngram, estfrequency, round((estfrequency * log10(cnt/job_matches)),2) from todays_job_stats JOIN j
ob_count' > output.txt

Friday, August 30, 2013

Real Time Intelligence talk

I gave the a talk last week on moving from Analytics to Action at the Gateway Analytics Network in Chicago.  It was great fun, and the audience was widely varied ... from product managers, to analytics folks, and data scientists.  I got some excellent feedback, and was able to take in some great talks as well.  I'm particularly excited about what Beyond Core is up to.

My presentation deck is linked here.

I'll be rehashing my talk in November at IIT's Stuart School of Business Friday Research Presentation Series if you would like to come and heckle.

Tuesday, August 13, 2013

Scaling Hive Row Specific N-Grams

I've been doing a lot of NLP (Natural Language Processing) work in Hive of late, and earlier posted about the excellent n-gram utilities available out of the box.

That said, there are some scaling issues.

I was running into problems with this code, it ran on a few hundred rows but I was getting heap overflow errors in Hive when I ran it on more than 1000.  Now, you could up the heap size, but that isn't really addressing the problem.  Plus, I needed it to run on over 100K rows.

# this is fine, pulls in daily jobs, 85K or so for 08/01
hive -e 'drop table todays_jobs;'
hive -e 'create table todays_jobs(id string, title string, description string);
INSERT OVERWRITE TABLE todays_jobs
SELECT id, title, description from jobs
WHERE substring(post_date,2,10)="2013-08-01"
GROUP BY id, title, description;' 
## this works on 1000 jd's ... pukes on 10000 ##
hive -e 'drop table job_skills;'
hive -e 'create table job_skills(id string, title string, NEW_ITEM ARRAY<STRUCT<ngram:array<string>, estfrequency:double>>);
INSERT OVERWRITE TABLE job_skills
SELECT id, title, context_ngrams(sentences(lower(description)), array(null), 100) as word_map
FROM todays_jobs
GROUP BY by id, title;'

I found the Hive documentation lacking, so as a public service am posting my code with some comments.  Notice that I added a fourth item in the select clause (substring(title,0,2)), even though I only declare three fields while creating the table.  This is where the partition value goes.  Normally a date would be great to use here ... but in my case they were all the same so I had to use look elsewhere.
# I create partitions based on the first two characters of the title
# this gave me 683 partitions for 08/01 ... which slowed down the MapReduce process
# but allowed me to complete the ngram processing in the next step 
hive -e 'create table todays_jobs(id string, description string, title string) PARTITIONED BY (part string);
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;
INSERT OVERWRITE TABLE todays_jobs PARTITION(part)
SELECT id, description, title, substring(title,0,2) from jobs
WHERE substring(post_date,2,10)="2013-08-01"
GROUP BY id, description, title, substring(title,0,2)
;'
# now works on 80K+ rows ... extracting 100 1 word ngrams per row
hive -e 'drop table ngram_skills;'
hive -e 'create table ngram_skills(id string, title string, NEW_ITEM ARRAY<STRUCT<ngram:array<string>, estfrequency:double>>);
INSERT OVERWRITE TABLE ngram_skills
SELECT id, title, context_ngrams(sentences(lower(description)), array(null), 100) as word_map
FROM todays_jobs
group by id, title;'
What I end up here is a a table with jobs details (id and title) and an n-gram of the description.  This makes for very easy matching (once I explode the ngram which is pretty damn cryptic).

hive -e  'drop table trending_words;'
hive -e  'create table trending_words (id string, title string, ngram array<string>,  estfrequency double);
INSERT OVERWRITE TABLE trending_words
SELECT id, title, X.ngram, X.estfrequency from ngram_skills LATERAL VIEW explode(new_item) Z as X;' 
hive -e 'select tw.id, tw.title, tw.ngram, tw.estfrequency FROM trending_words tw JOIN 1w_taxonomy tax ON (tw.ngram[0] = lower(tax.title)) where tax.title<>"title" order by tw.id, tw.estfrequency DESC limit 1000;'
 And I get output like this!!!!
439235503 "Director of Marketing Kindred Central Dakotas" ["marketing"] 9.0
439235503 "Director of Marketing Kindred Central Dakotas" ["sales"] 5.0
439235503 "Director of Marketing Kindred Central Dakotas" ["leadership"] 3.0
439235503 "Director of Marketing Kindred Central Dakotas" ["training"] 2.0 
439235505 "Certified Nursing Assistant I / Full Time" ["supervision"] 1.0
439235505 "Certified Nursing Assistant I / Full Time" ["collaboration"] 1.0 
439235506 "CNA - Evenings, Full Time Part Time - Wareham, MA" ["collaboration"] 1.0
439235506 "CNA - Evenings, Full Time Part Time - Wareham, MA" ["supervision"] 1.0
439235507 "CNA - Full Time Part Time, Days - Wareham, MA" ["collaboration"] 1.0
439235507 "CNA - Full Time Part Time, Days - Wareham, MA" ["supervision"] 1.0


Friday, July 5, 2013

variety over volume

When embarking on a Big Data effort, it almost always pays go for variety first.  While volume and velocity may be sexy, variety often makes data sets usable.


Now you can do all sorts of Machine Learning / Math to go and prove whether you should be going wide (adding variety) or deep (adding volume), but I'd like to offer a much more practical example.

Earlier I've written about our "Voice of the Customer" Pilot.  We are basically polling all our students who call in (or who we call).  We've had a tremendous response, but selling the data internally has been a bit of a slog because we tied it all together manually in a cumbersome and irregular process.

So I recently took the time to add the survey results to our Hadoop Cluster.

Somehow I convinced Wufoo (where our surveys live) to give me an integration key (a giant PITA), then leveraged the excellent WuParty ruby gem to download the survey results, which I drop in a tab delimited file and ship to S3 (where my EMR cluster picks them up for processing).

This is like 100K of data a day.  Certainty not big.

But with this added data source (aka additional variety) I am able to tie the actual calls to the survey results.  I throw this together in a delimited file which our BI team picks up from S3 and leverages to build reports for the CC managers and departments.


This is damn near real time feedback on our call centers, and it is accessible.
hive -e 'select fb.username, fb.email, fb.interaction_id, fb.department, call.department, call.resource_name, fb.satisfied, fb.ease, fb.recommend, fb.comments, fb.created_date, call.l1_name, call.l2_name, call.l3_name, call.l4_name from feedback fb JOIN calls call ON (fb.interaction_id=call.interaction_id) WHERE resource_name RLIKE "^[A-Za-z]" AND call.l1_name IS NOT NULL and call.l1_name <> "" GROUP BY fb.username, fb.email, fb.interaction_id, fb.department, call.resource_name, call.department, fb.satisfied, fb.ease, fb.recommend, fb.comments, fb.created_date, call.l1_name, call.l2_name, call.l3_name, call.l4_name;' > agent_feedback.csv
So what I've done here, is add almost no volume to our cluster ... but deliver a ton of value with variety.

And the survey results are not a one trick pony, now that I have this data coming into my cluster I can leverage it new and unthought of ways.  How about looking at student outcomes and CSAT scores?  Can falling CSAT predict bad outcomes?  How about the opposite?

Good questions start to arise only when we have the data to ask them.