ลองเขียน MapReduce กัน

ก่อนหน้านี้เขียนถึงหลักการทำงานของ MapReduce ไปแล้ว คราวนี้มาลองลงมือเขียนโค้ดกันเลยดีกว่า ปกติแล้วการเขียน MapReduce เพื่อเอาไปใช้กับ Hadoop เราจะเขียนด้วย Java แต่ Hadoop นั้นมี API ตัวหนึ่งที่ทำให้เราเขียน MapReduce ด้วยภาษาอะไรก็ได้ Yes! และ API ตัวนั้นก็คือ Hadoop Streaming นั่นเอง ซึ่งตัวมันจะใช้ Unix standard streams เป็น interface ระหว่าง Hadoop กับโปรแกรมของเรา ดังนั้น.. ภาษาอะไรก็ตามที่สามารถอ่าน standard input และเขียนใส่ standard output ได้ ภาษานั้นก็สามารถใช้เขียน MapReduce ได้! (-/\-) กราบ

ใครก็ตามที่ผ่านเข้ามาอ่านบทความนี้ ไม่ต้องกังวลว่าจะต้องไปลง Hadoop บนเครื่องตัวเองนะครับ ขอแค่คุณใช้ระบบปฏิบัติการที่เป็น Linux ก็พอ ก็สามารถทำตามได้ (ถ้าใครใช้ Windows ก็ใช้ Cygwin แทนเนอะ)

ขอยกเอารูปเดิมจากบล็อกที่แล้วมาแปะไว้ให้ดูกันอีกรอบก่อน ที่โพสต์นั้นได้บอกไปว่าไม่ต้องสนใจ Unix command แต่ในโพสต์นี้เราต้องมาสนใจมันหน่อยละ 🙂

MapReduce Logical Data Flow

ถ้าคุ้นเคยกับพวก command line อาจจะคุ้นๆ ว่าในรูปคือการใช้ Pipeline นั่นเอง นั่นคือเราจะใช้ cat อ่านไฟล์ แล้วพ่น output ต่อไปยังฟังก์ชั่น map ที่เราเขียนไว้โดยจะส่งต่อให้คำสั่ง sort ของ Unix และส่งต่อผลลัพธ์ไปยังฟังก์ชั่น reduce สุดท้ายเราก็จะได้ output สุดท้ายออกมา สิ่งที่เราทำอยู่นี้เป็นการจำลองการทำงานของ MapReduce บน Hadoop นั่นเอง โดยที่เราไม่ต้องลงไว้ในเครื่อง และก็เป็น practice หนึ่งที่ควรจะทำเป็น เพราะเราควรจะใช้ practice นี้ลองกับข้อมูลน้อยๆ ก่อน เพื่อพิสูจน์ว่าเราเขียนโปรแกรมถูกต้อง แล้วค่อยเอาไปรันกับข้อมูลใหญ่ๆ กับ Hadoop ต่อไป

เกริ่นมาพอสมควร ลงมือทำเลยดีกว่า ข้อมูลที่เราจะนำมาลอง ผมเอามาจากคอร์ส Introduction to Hadoop and MapReduce ของ Udacity คอร์สนี้เปิดฟรี ใครอยากลองเชิญเลยนะ ^^ ของเค้าดีจริง แต่ข้อมูลนั้นผมตัดมาให้เหลือแค่ 30 เรคคอร์ด ตั้งชื่อไฟล์ว่า purchases.txt เป็นข้อมูลเกี่ยวกับการซื้อสินค้าของร้านๆ หนึ่งในเมืองต่างๆ ตามนี้

2012-01-01	09:00	San Jose	Men's Clothing	214.05	Amex
2012-01-01	09:00	Fort Worth	Women's Clothing	153.57	Visa
2012-01-01	09:00	Pittsburgh	Pet Supplies	493.51	Discover
2012-01-01	09:00	Omaha	Children's Clothing	235.63	MasterCard
2012-01-01	09:00	Stockton	Men's Clothing	247.18	MasterCard
2012-01-01	09:00	Austin	Cameras	379.6	Visa
2012-01-01	09:00	Fort Worth	Toys	213.88	Visa
2012-01-01	09:00	Las Vegas	Video Games	53.26	Visa
2012-01-01	09:00	Austin	Cameras	469.63	MasterCard
2012-01-01	09:00	Lincoln	Garden	136.9	Visa
2012-01-01	09:00	San Jose	Women's Clothing	215.82	Cash
2012-01-01	09:00	Las Vegas	Books	93.39	Visa
2012-01-01	09:00	Virginia Beach	Children's Clothing	376.11	Amex
2012-01-01	09:01	Riverside	Consumer Electronics	252.88	Cash
2012-01-01	09:01	Reno	Crafts	88.25	Visa
2012-01-01	09:01	Chicago	Books	31.08	Cash
2012-01-01	09:01	Madison	Men's Clothing	16.78	Visa
2012-01-01	09:01	Austin	Sporting Goods	327.75	Discover
2012-01-01	09:01	Portland	CDs	108.69	Amex
2012-01-01	09:01	Riverside	Sporting Goods	15.41	Discover
2012-01-01	09:01	Reno	Toys	80.46	Visa
2012-01-01	09:01	Anchorage	Music	298.86	MasterCard
2012-01-01	09:01	Pittsburgh	Sporting Goods	475.26	Amex
2012-01-01	09:01	Spokane	Garden	3.85	Amex
2012-01-01	09:01	Spokane	Computers	287.65	MasterCard
2012-01-01	09:01	Omaha	Baby	255.68	MasterCard
2012-01-01	09:01	Anchorage	DVDs	6.38	Amex
2012-01-01	09:01	Anchorage	Crafts	22.36	Amex
2012-01-01	09:02	Chandler	Books	414.08	Cash
2012-01-01	09:02	Chandler	Books	344.09	Discover

แต่ละคอลัมน์ (ใช้ tab แบ่ง) เรียงจากซ้ายไปขวาคือ วัน เวลา ชื่อเมืองที่มีร้านนั้นอยู่ ชนิดสินค้า ราคา วิธีการจ่ายเงิน เราจะมาเขียน MapReduce เพื่อหาผลรวมของยอดขายของร้านแต่ละร้านกัน

ขั้นตอนต่อไปเราจะเขียน Mapper ขึ้นมา มีหน้าที่หลักๆ คือ preprocessing และกรองข้อมูลเพื่อนำไปใช้ในส่วนของ Reducer และด้วยความ lnw ของ Hadoop Streaming ทำให้เราสามาถเขียนภาษาอะไรก็ได้ ดังนั้นในทีนี้ผมขอลอง Ruby นะ ด้วยความอยากส่วนตัวล้วนๆ

ไฟล์ mapper.rb

STDIN.each_line do |line|
    date, time, store, item, cost, payment = line.split("\t")
    puts "#{store}\t#{cost}"
end

โค้ดนี้แค่อ่าน standard input เข้ามา กรองข้อมูล แล้วก็พ่นออกไปยัง standard output ง่ายไปไหม? ก็ง่ายๆ แบบนี้แหละ ความยากจริงๆ ไม่ได้อยู่ที่การเขียนโค้ดนะ มันอยู่ที่ว่าเราจะอยากได้คำตอบอะไรจากข้อมูลนั้นๆ และการออกแบบ Mapper กับ Reducer เพื่อคำตอบนั้นมากกว่า

พอเราเขียน Mapper เสร็จ เราสามารถทดสอบโค้ดเราได้เลย ใช้คำสั่งด้านล่างนี้ได้เลย

cat purchases.txt | ruby mapper.rb

โดยไม่จำเป็นต้องรันทั้ง pipeline 🙂 จะได้ผลลัพธ์ออกมาตามนี้

San Jose	214.05
Fort Worth	153.57
Pittsburgh	493.51
Omaha	235.63
Stockton	247.18
Austin	379.6
Fort Worth	213.88
Las Vegas	53.26
Austin	469.63
Lincoln	136.9
San Jose	215.82
Las Vegas	93.39
Virginia Beach	376.11
Riverside	252.88
Reno	88.25
Chicago	31.08
Madison	16.78
Austin	327.75
Portland	108.69
Riverside	15.41
Reno	80.46
Anchorage	298.86
Pittsburgh	475.26
Spokane	3.85
Spokane	287.65
Omaha	255.68
Anchorage	6.38
Anchorage	22.36
Chandler	414.08
Chandler	344.09

ลองไปดูตัว Reducer กันบ้าง ตรงนี้จะวุ่นวายขึ้นมาอีกเล็กน้อย

ไฟล์ reducer.rb

last_key, total = nil, 0

STDIN.each_line do |line|
    key, val = line.split("\t")
    if last_key && last_key != key
        puts "#{last_key}\t#{count}\t#{total}"
        last_key, total = key, val.to_f
    else
        last_key, total = key, total += val.to_f
    end
end

puts "#{last_key}\t#{count}\t#{total}" if last_key

โค้ดนี้จะอ่าน standard input เข้ามา แล้วก็รวบรวมยอดขาย และสุดท้ายก็พ่นผลลัพธ์ใส่ standard output

การประมวลผลใน Reducer จะเป็นแบบ batch จะเห็นได้ว่าส่วนที่วุ่นวายจะเป็นส่วนที่เราต้องเช็คว่า key ที่ส่งเข้ามาเปลี่ยนไปยัง key ต่อไปหรือยัง ตรงนี้อาจจะสงสัยว่ารู้ได้อย่างไรว่า key มันจะมาแบบไหน ถ้าไม่รู้แล้วเขียนเช็ค key แบบนี้ได้อย่างไร คำตอบคือ เรารู้ครับว่า key มันจะมาไหน มันจะมาแบบที่ sort แล้วครับ ถ้ายัง งง อยู่ ลองย้อนกลับไปดูที่รูป Data Flow ข้างบน จะเห็นได้ว่า ก่อนที่ข้อมูลจะส่งเข้ามายังตัว Reducer จะมีการ shuffle & sort ข้อมูลตาม key ก่อนครับ

เอาละ เขียนเสร็จแล้วทั้ง Mapper และ Reducer สามารถรันทั้ง pipeline โดยใช้คำสั่ง

cat purchases.txt | ruby mapper.rb | sort | ruby reducer.rb

จะได้ผลลัพธ์สุดท้ายดังนี้

Anchorage	327.6
Austin	1176.98
Chandler	758.17
Chicago	31.08
Fort Worth	367.45
Las Vegas	146.65
Lincoln	136.9
Madison	16.78
Omaha	491.31
Pittsburgh	968.77
Portland	108.69
Reno	168.70999999999998
Riverside	268.29
San Jose	429.87
Spokane	291.5
Stockton	247.18
Virginia Beach	376.11

เสร็จสิ้นทุกกระบวนการ ถ้าใครขี้เกียจลง Hadoop บนเครื่องตัวเองก็ทำแบบที่เขียนไว้ได้เลยนะครับ 😀

ปล. แนะนำถ้าใครอยากลง Hadoop บนเครื่องตัวเอง ให้ลง Cloudera’s Distribution Including Apache Hadoop (CDH) แทน ซึ่งเป็น open source ที่มีทุกอย่างครบ แทบจะไม่ต้องไปโหลดเพิ่มอีกแล้ว

Credit: Hadoop: The Definitive Guide เขียนโดย Tom White

Author: zkan

Soon to be a newbie data scientist. I ♥ machine learning, computer vision, robotics, image processing, data visualization, and data analytics.

Leave a Reply

Your email address will not be published. Required fields are marked *