ก่อนหน้านี้เขียนถึงหลักการทำงานของ MapReduce ไปแล้ว คราวนี้มาลองลงมือเขียนโค้ดกันเลยดีกว่า ปกติแล้วการเขียน MapReduce เพื่อเอาไปใช้กับ Hadoop เราจะเขียนด้วย Java แต่ Hadoop นั้นมี API ตัวหนึ่งที่ทำให้เราเขียน MapReduce ด้วยภาษาอะไรก็ได้ Yes! และ API ตัวนั้นก็คือ Hadoop Streaming นั่นเอง ซึ่งตัวมันจะใช้ Unix standard streams เป็น interface ระหว่าง Hadoop กับโปรแกรมของเรา ดังนั้น.. ภาษาอะไรก็ตามที่สามารถอ่าน standard input และเขียนใส่ standard output ได้ ภาษานั้นก็สามารถใช้เขียน MapReduce ได้! (-/\-) กราบ
ใครก็ตามที่ผ่านเข้ามาอ่านบทความนี้ ไม่ต้องกังวลว่าจะต้องไปลง Hadoop บนเครื่องตัวเองนะครับ ขอแค่คุณใช้ระบบปฏิบัติการที่เป็น Linux ก็พอ ก็สามารถทำตามได้ (ถ้าใครใช้ Windows ก็ใช้ Cygwin แทนเนอะ)
ขอยกเอารูปเดิมจากบล็อกที่แล้วมาแปะไว้ให้ดูกันอีกรอบก่อน ที่โพสต์นั้นได้บอกไปว่าไม่ต้องสนใจ Unix command แต่ในโพสต์นี้เราต้องมาสนใจมันหน่อยละ 🙂
ถ้าคุ้นเคยกับพวก command line อาจจะคุ้นๆ ว่าในรูปคือการใช้ Pipeline นั่นเอง นั่นคือเราจะใช้ cat อ่านไฟล์ แล้วพ่น output ต่อไปยังฟังก์ชั่น map ที่เราเขียนไว้โดยจะส่งต่อให้คำสั่ง sort ของ Unix และส่งต่อผลลัพธ์ไปยังฟังก์ชั่น reduce สุดท้ายเราก็จะได้ output สุดท้ายออกมา สิ่งที่เราทำอยู่นี้เป็นการจำลองการทำงานของ MapReduce บน Hadoop นั่นเอง โดยที่เราไม่ต้องลงไว้ในเครื่อง และก็เป็น practice หนึ่งที่ควรจะทำเป็น เพราะเราควรจะใช้ practice นี้ลองกับข้อมูลน้อยๆ ก่อน เพื่อพิสูจน์ว่าเราเขียนโปรแกรมถูกต้อง แล้วค่อยเอาไปรันกับข้อมูลใหญ่ๆ กับ Hadoop ต่อไป
เกริ่นมาพอสมควร ลงมือทำเลยดีกว่า ข้อมูลที่เราจะนำมาลอง ผมเอามาจากคอร์ส Introduction to Hadoop and MapReduce ของ Udacity คอร์สนี้เปิดฟรี ใครอยากลองเชิญเลยนะ ^^ ของเค้าดีจริง แต่ข้อมูลนั้นผมตัดมาให้เหลือแค่ 30 เรคคอร์ด ตั้งชื่อไฟล์ว่า purchases.txt เป็นข้อมูลเกี่ยวกับการซื้อสินค้าของร้านๆ หนึ่งในเมืองต่างๆ ตามนี้
2012-01-01 09:00 San Jose Men's Clothing 214.05 Amex 2012-01-01 09:00 Fort Worth Women's Clothing 153.57 Visa 2012-01-01 09:00 Pittsburgh Pet Supplies 493.51 Discover 2012-01-01 09:00 Omaha Children's Clothing 235.63 MasterCard 2012-01-01 09:00 Stockton Men's Clothing 247.18 MasterCard 2012-01-01 09:00 Austin Cameras 379.6 Visa 2012-01-01 09:00 Fort Worth Toys 213.88 Visa 2012-01-01 09:00 Las Vegas Video Games 53.26 Visa 2012-01-01 09:00 Austin Cameras 469.63 MasterCard 2012-01-01 09:00 Lincoln Garden 136.9 Visa 2012-01-01 09:00 San Jose Women's Clothing 215.82 Cash 2012-01-01 09:00 Las Vegas Books 93.39 Visa 2012-01-01 09:00 Virginia Beach Children's Clothing 376.11 Amex 2012-01-01 09:01 Riverside Consumer Electronics 252.88 Cash 2012-01-01 09:01 Reno Crafts 88.25 Visa 2012-01-01 09:01 Chicago Books 31.08 Cash 2012-01-01 09:01 Madison Men's Clothing 16.78 Visa 2012-01-01 09:01 Austin Sporting Goods 327.75 Discover 2012-01-01 09:01 Portland CDs 108.69 Amex 2012-01-01 09:01 Riverside Sporting Goods 15.41 Discover 2012-01-01 09:01 Reno Toys 80.46 Visa 2012-01-01 09:01 Anchorage Music 298.86 MasterCard 2012-01-01 09:01 Pittsburgh Sporting Goods 475.26 Amex 2012-01-01 09:01 Spokane Garden 3.85 Amex 2012-01-01 09:01 Spokane Computers 287.65 MasterCard 2012-01-01 09:01 Omaha Baby 255.68 MasterCard 2012-01-01 09:01 Anchorage DVDs 6.38 Amex 2012-01-01 09:01 Anchorage Crafts 22.36 Amex 2012-01-01 09:02 Chandler Books 414.08 Cash 2012-01-01 09:02 Chandler Books 344.09 Discover
แต่ละคอลัมน์ (ใช้ tab แบ่ง) เรียงจากซ้ายไปขวาคือ วัน เวลา ชื่อเมืองที่มีร้านนั้นอยู่ ชนิดสินค้า ราคา วิธีการจ่ายเงิน เราจะมาเขียน MapReduce เพื่อหาผลรวมของยอดขายของร้านแต่ละร้านกัน
ขั้นตอนต่อไปเราจะเขียน Mapper ขึ้นมา มีหน้าที่หลักๆ คือ preprocessing และกรองข้อมูลเพื่อนำไปใช้ในส่วนของ Reducer และด้วยความ lnw ของ Hadoop Streaming ทำให้เราสามาถเขียนภาษาอะไรก็ได้ ดังนั้นในทีนี้ผมขอลอง Ruby นะ ด้วยความอยากส่วนตัวล้วนๆ
ไฟล์ mapper.rb
STDIN.each_line do |line| date, time, store, item, cost, payment = line.split("\t") puts "#{store}\t#{cost}" end
โค้ดนี้แค่อ่าน standard input เข้ามา กรองข้อมูล แล้วก็พ่นออกไปยัง standard output ง่ายไปไหม? ก็ง่ายๆ แบบนี้แหละ ความยากจริงๆ ไม่ได้อยู่ที่การเขียนโค้ดนะ มันอยู่ที่ว่าเราจะอยากได้คำตอบอะไรจากข้อมูลนั้นๆ และการออกแบบ Mapper กับ Reducer เพื่อคำตอบนั้นมากกว่า
พอเราเขียน Mapper เสร็จ เราสามารถทดสอบโค้ดเราได้เลย ใช้คำสั่งด้านล่างนี้ได้เลย
cat purchases.txt | ruby mapper.rb
โดยไม่จำเป็นต้องรันทั้ง pipeline 🙂 จะได้ผลลัพธ์ออกมาตามนี้
San Jose 214.05 Fort Worth 153.57 Pittsburgh 493.51 Omaha 235.63 Stockton 247.18 Austin 379.6 Fort Worth 213.88 Las Vegas 53.26 Austin 469.63 Lincoln 136.9 San Jose 215.82 Las Vegas 93.39 Virginia Beach 376.11 Riverside 252.88 Reno 88.25 Chicago 31.08 Madison 16.78 Austin 327.75 Portland 108.69 Riverside 15.41 Reno 80.46 Anchorage 298.86 Pittsburgh 475.26 Spokane 3.85 Spokane 287.65 Omaha 255.68 Anchorage 6.38 Anchorage 22.36 Chandler 414.08 Chandler 344.09
ลองไปดูตัว Reducer กันบ้าง ตรงนี้จะวุ่นวายขึ้นมาอีกเล็กน้อย
ไฟล์ reducer.rb
last_key, total = nil, 0 STDIN.each_line do |line| key, val = line.split("\t") if last_key && last_key != key puts "#{last_key}\t#{count}\t#{total}" last_key, total = key, val.to_f else last_key, total = key, total += val.to_f end end puts "#{last_key}\t#{count}\t#{total}" if last_key
โค้ดนี้จะอ่าน standard input เข้ามา แล้วก็รวบรวมยอดขาย และสุดท้ายก็พ่นผลลัพธ์ใส่ standard output
การประมวลผลใน Reducer จะเป็นแบบ batch จะเห็นได้ว่าส่วนที่วุ่นวายจะเป็นส่วนที่เราต้องเช็คว่า key ที่ส่งเข้ามาเปลี่ยนไปยัง key ต่อไปหรือยัง ตรงนี้อาจจะสงสัยว่ารู้ได้อย่างไรว่า key มันจะมาแบบไหน ถ้าไม่รู้แล้วเขียนเช็ค key แบบนี้ได้อย่างไร คำตอบคือ เรารู้ครับว่า key มันจะมาไหน มันจะมาแบบที่ sort แล้วครับ ถ้ายัง งง อยู่ ลองย้อนกลับไปดูที่รูป Data Flow ข้างบน จะเห็นได้ว่า ก่อนที่ข้อมูลจะส่งเข้ามายังตัว Reducer จะมีการ shuffle & sort ข้อมูลตาม key ก่อนครับ
เอาละ เขียนเสร็จแล้วทั้ง Mapper และ Reducer สามารถรันทั้ง pipeline โดยใช้คำสั่ง
cat purchases.txt | ruby mapper.rb | sort | ruby reducer.rb
จะได้ผลลัพธ์สุดท้ายดังนี้
Anchorage 327.6 Austin 1176.98 Chandler 758.17 Chicago 31.08 Fort Worth 367.45 Las Vegas 146.65 Lincoln 136.9 Madison 16.78 Omaha 491.31 Pittsburgh 968.77 Portland 108.69 Reno 168.70999999999998 Riverside 268.29 San Jose 429.87 Spokane 291.5 Stockton 247.18 Virginia Beach 376.11
เสร็จสิ้นทุกกระบวนการ ถ้าใครขี้เกียจลง Hadoop บนเครื่องตัวเองก็ทำแบบที่เขียนไว้ได้เลยนะครับ 😀
ปล. แนะนำถ้าใครอยากลง Hadoop บนเครื่องตัวเอง ให้ลง Cloudera’s Distribution Including Apache Hadoop (CDH) แทน ซึ่งเป็น open source ที่มีทุกอย่างครบ แทบจะไม่ต้องไปโหลดเพิ่มอีกแล้ว