Thursday, December 20, 2012

using MapReduce to move files

One of the biggest issues in dealing with large volumes of data is moving it around.  Thankfully, we use Amazon's S3 as a store and process via their EMR.  Access inside their network is pretty snappy.

Still, I am moving a lot of data into our Hadoop clusters every morning.  This data needs to go through our master and be distributed on to all of the slave nodes (normally 7).  The following took a hair over 20 minutes.
hadoop fs -cp s3://eCollegeGradeData/activity/*.CSV /temp4/. 

12/12/20 17:12:50 INFO s3native.NativeS3FileSystem: Opening 's3://eCollegeGradeData/activity/Activity_KU_20120127.CSV' for reading
12/12/20 17:12:51 INFO s3native.NativeS3FileSystem: Opening 's3://eCollegeGradeData/activity/Activity_KU_20120128.CSV' for reading
12/12/20 17:12:51 INFO s3native.NativeS3FileSystem: Opening
...
12/12/20 17:32:22 INFO s3native.NativeS3FileSystem: Opening 's3://eCollegeGradeData/activity/Activity_KU_Historical_Extract_20120126_7.CSV' for reading

S3DistCp is an S3 enabled version of Apache's Hadoop Disctcp that uses MapReduce (well Map, at least) to move the files directly form S3 to the Hadoop slave nodes eliminating the master node bottleneck.

hadoop distcp s3n://eCollegeGradeData/activity/ /temp4/
12/12/20 17:41:49 INFO tools.DistCp: srcPaths=[s3n://eCollegeGradeData/activity]
12/12/20 17:41:49 INFO tools.DistCp: destPath=/temp4
12/12/20 17:41:50 INFO metrics.MetricsSaver: MetricsSaver DistCp root:hdfs:///mnt/var/lib/hadoop/metrics/ period:60 instanceId:i-b0bbf4ce jobflow:j-3IZ0J8F1W334P
12/12/20 17:41:50 INFO metrics.MetricsUtil: supported product mapr-m3
12/12/20 17:41:50 INFO metrics.MetricsSaver: Disable MetricsSaver due to MapR cluster
12/12/20 17:41:51 INFO fs.JobTrackerWatcher: Current running JobTracker is: ip-10-80-63-76.ec2.internal/10.80.63.76:9001
12/12/20 17:41:51 INFO tools.DistCp: sourcePathsCount=337
12/12/20 17:41:51 INFO tools.DistCp: filesToCopyCount=336
12/12/20 17:41:51 INFO tools.DistCp: bytesToCopyCount=13.0g
12/12/20 17:41:51 INFO fs.JobTrackerWatcher: Current running JobTracker is: ip-10-80-63-76.ec2.internal/10.80.63.76:9001
12/12/20 17:41:51 INFO fs.JobTrackerWatcher: Current running JobTracker is: ip-10-80-63-76.ec2.internal/10.80.63.76:9001
12/12/20 17:41:51 INFO mapred.JobClient: Running job: job_201212191754_0027
12/12/20 17:41:52 INFO mapred.JobClient:  map 0% reduce 0%
12/12/20 17:42:06 INFO mapred.JobClient:  map 2% reduce 0%
12/12/20 17:42:07 INFO mapred.JobClient:  map 5% reduce 0%
12/12/20 17:42:08 INFO mapred.JobClient:  map 8% reduce 0%
12/12/20 17:42:09 INFO mapred.JobClient:  map 9% reduce 0%
12/12/20 17:42:10 INFO mapred.JobClient:  map 14% reduce 0%
12/12/20 17:42:11 INFO mapred.JobClient:  map 16% reduce 0%
12/12/20 17:42:12 INFO mapred.JobClient:  map 29% reduce 0%
12/12/20 17:42:13 INFO mapred.JobClient:  map 31% reduce 0%
12/12/20 17:42:14 INFO mapred.JobClient:  map 39% reduce 0%
12/12/20 17:42:15 INFO mapred.JobClient:  map 41% reduce 0%
12/12/20 17:42:16 INFO mapred.JobClient:  map 43% reduce 0%
12/12/20 17:42:17 INFO mapred.JobClient:  map 45% reduce 0%
12/12/20 17:42:18 INFO mapred.JobClient:  map 47% reduce 0%
12/12/20 17:42:19 INFO mapred.JobClient:  map 50% reduce 0%
12/12/20 17:42:20 INFO mapred.JobClient:  map 51% reduce 0%
12/12/20 17:42:21 INFO mapred.JobClient:  map 53% reduce 0%
12/12/20 17:42:22 INFO mapred.JobClient:  map 54% reduce 0%
12/12/20 17:42:23 INFO mapred.JobClient:  map 56% reduce 0%
12/12/20 17:42:24 INFO mapred.JobClient:  map 57% reduce 0%
12/12/20 17:42:25 INFO mapred.JobClient:  map 58% reduce 0%
12/12/20 17:42:27 INFO mapred.JobClient:  map 60% reduce 0%
12/12/20 17:42:30 INFO mapred.JobClient:  map 65% reduce 0%
12/12/20 17:42:32 INFO mapred.JobClient:  map 66% reduce 0%
12/12/20 17:42:33 INFO mapred.JobClient:  map 69% reduce 0%
12/12/20 17:42:35 INFO mapred.JobClient:  map 70% reduce 0%
12/12/20 17:42:36 INFO mapred.JobClient:  map 71% reduce 0%
12/12/20 17:42:37 INFO mapred.JobClient:  map 73% reduce 0%
12/12/20 17:42:38 INFO mapred.JobClient:  map 74% reduce 0%
12/12/20 17:42:39 INFO mapred.JobClient:  map 75% reduce 0%
12/12/20 17:42:40 INFO mapred.JobClient:  map 76% reduce 0%
12/12/20 17:42:42 INFO mapred.JobClient:  map 77% reduce 0%
12/12/20 17:42:44 INFO mapred.JobClient:  map 78% reduce 0%
12/12/20 17:42:58 INFO mapred.JobClient:  map 79% reduce 0%
12/12/20 17:43:02 INFO mapred.JobClient:  map 83% reduce 0%
12/12/20 17:43:03 INFO mapred.JobClient:  map 86% reduce 0%
12/12/20 17:43:04 INFO mapred.JobClient:  map 87% reduce 0%
12/12/20 17:43:05 INFO mapred.JobClient:  map 88% reduce 0%
12/12/20 17:43:07 INFO mapred.JobClient:  map 90% reduce 0%
12/12/20 17:43:09 INFO mapred.JobClient:  map 92% reduce 0%
12/12/20 17:43:11 INFO mapred.JobClient:  map 93% reduce 0%
12/12/20 17:43:12 INFO mapred.JobClient:  map 94% reduce 0%
12/12/20 17:43:14 INFO mapred.JobClient:  map 97% reduce 0%
12/12/20 17:43:39 INFO mapred.JobClient:  map 98% reduce 0%
12/12/20 17:43:45 INFO mapred.JobClient:  map 99% reduce 0%
12/12/20 17:43:51 INFO mapred.JobClient:  map 100% reduce 0%
12/12/20 17:46:43 INFO mapred.JobClient: Job complete: job_201212191754_0027
12/12/20 17:46:43 INFO mapred.JobClient: Counters: 21
12/12/20 17:46:43 INFO mapred.JobClient:   Job Counters
12/12/20 17:46:43 INFO mapred.JobClient:     Aggregate execution time of mappers(ms)=1924883
12/12/20 17:46:43 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
12/12/20 17:46:43 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
12/12/20 17:46:43 INFO mapred.JobClient:     Launched map tasks=34
12/12/20 17:46:43 INFO mapred.JobClient:     Aggregate execution time of reducers(ms)=0
12/12/20 17:46:43 INFO mapred.JobClient:   FileSystemCounters
12/12/20 17:46:43 INFO mapred.JobClient:     MAPRFS_BYTES_READ=84770
12/12/20 17:46:43 INFO mapred.JobClient:     S3N_BYTES_READ=13971069631
12/12/20 17:46:43 INFO mapred.JobClient:     MAPRFS_BYTES_WRITTEN=13971069631
12/12/20 17:46:43 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1659360
12/12/20 17:46:43 INFO mapred.JobClient:   distcp
12/12/20 17:46:43 INFO mapred.JobClient:     Files copied=336
12/12/20 17:46:43 INFO mapred.JobClient:     Bytes copied=13971069631
12/12/20 17:46:43 INFO mapred.JobClient:     Bytes expected=13971069631
12/12/20 17:46:43 INFO mapred.JobClient:   Map-Reduce Framework
12/12/20 17:46:43 INFO mapred.JobClient:     Map input records=336
12/12/20 17:46:43 INFO mapred.JobClient:     PHYSICAL_MEMORY_BYTES=4710170624
12/12/20 17:46:43 INFO mapred.JobClient:     Spilled Records=0
12/12/20 17:46:43 INFO mapred.JobClient:     CPU_MILLISECONDS=530380
12/12/20 17:46:43 INFO mapred.JobClient:     VIRTUAL_MEMORY_BYTES=56869019648
12/12/20 17:46:43 INFO mapred.JobClient:     Map input bytes=50426
12/12/20 17:46:43 INFO mapred.JobClient:     Map output records=0
12/12/20 17:46:43 INFO mapred.JobClient:     SPLIT_RAW_BYTES=5100
12/12/20 17:46:43 INFO mapred.JobClient:     GC time elapsed (ms)=3356
12/12/20 17:46:43 INFO metrics.MetricsSaver: Inside MetricsSaver Shutdown Hook

Now, I should have gone looking for a better solution months ago ... but frankly this wasn't that huge of an issue.  I went from 20 minutes to 5 on this particular copy and have seen go even lower than that.  If you add it all up, I was able to reduce my execution time by half an hour.

Not too shabby!