Extract zip code information using spark

** Fast with spark ** I caught it in my ear, so I tried moving it once.

Initial setting

I introduced it using Homebrew as follows.

brew install apache-spark

Data acquisition

I got the CSV format zip code data from the following site. zipcloud

starting method

Since it was installed with Homebrew, move it to the following apache-spark folder.

cd /usr/local/Cellar/apache-spark/1.5.2/bin/

Apparently spark supports scala, java, python, R, but I wanted to use python

pyspark

Start with. If you see the "spark" mark, it's OK.

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.2
      /_/

Using Python version 2.7.11 (default, Dec 26 2015 17:47:53)
SparkContext available as sc, HiveContext available as sqlContext.
>>>

Implementation details

There are about 120,000 postal code data nationwide. Before implementing it, I copied and pasted the data 88 times assuming that a large amount of data was processed, and doubled it to about 12 million to create a new CSV file. And -** What the zip code contains "7" ** -** Municipal names include animal names ** I implemented it to search by AND condition.

sparkSample.py


# -*- coding: utf-8 -*-
import time
from pyspark import SparkContext

def main():
    #Kanji you want to search
    queryList = ["deer","bird","bear","monkey","dog"]
    
    #Time measurement start
    start = time.time()
    
    #data set
    sc = SparkContext('local', 'Simple App')
    logData = sc.textFile('KEN_ALL_OVER_TEN_MILLION.CSV')
    
    #Extract information for each query
    for item in queryList:
        #List with split
        lines = logData.map(lambda x: x.split(','))
        #Extract those with zip code containing 7
        numberPicks = lines.filter(lambda s: unicode('7', 'utf-8') in s[2])
        #Extract the names of cities, towns and villages that include the target kanji
        namePicks = lines.filter(lambda s: unicode(item, 'utf-8') in s[7])
        #Store in list
        desList = namePicks.collect()

        #Log output
        for line in desList:
            s = u""
            for i, unit in enumerate(line):
                if i != 0:
                    s = s + u', '
                s = s + unit
            print s.encode('utf-8')

        #Hit count output
        outlog = "query:" + item.decode('utf-8') + u", count:" + \
            unicode(str(len(desList)), 'utf-8') + ", Time:{0}".format(time.time() - start) + u"[sec]"
        print outlog.encode('utf-8')    
    
    #Time measurement stop
    finish_time = time.time() - start
    print u"Time[total]:{0}".format(finish_time) + u"[sec]"

    #End processing
    sc.stop()

if __name__ == '__main__':
    main()

In the evaluation, I also wrote the code when spark is not used.

plain.py


# -*- coding: utf-8 -*-
import time

def pickAnimal(recordList, qList, start):
    #Extract information for each query
	for q in qList:
		count = 0
		for record in recordList:
			sepRecord = record.split(",")
			if len(sepRecord) == 15:
                #Extract those with zip code containing 7
                #Extract the names of cities, towns and villages that include the target kanji
				if -1 < sepRecord[2].find("7") and -1 < sepRecord[7].find(q):
					count = count + 1
                    #Log output
					print record
                    #Hit count output
		print "query:" + q + ", count:" + str(count) + ", Time:{0}".format(time.time() - start) + "[sec]"

def main():

	sepRecordList = []
    #Kanji you want to search
	queryList = ["deer","bird","bear","monkey","dog"]

    #data set
	srcpath = "KEN_ALL_OVER_TEN_MILLION.CSV"
	srcIN = open(srcpath, 'r')

    #Time measurement start
	start = time.time()
	for line in srcIN:
		sepRecordList.append(line)

	pickAnimal(sepRecordList, queryList, start)

    #Time measurement stop
	finish_time = time.time() - start
	print "Time:{0}".format(finish_time) + "[sec]"

    #End processing
	srcIN.close()

if __name__ == '__main__':
    main()

Measurement result

$pyspark sparkSample.py
~(Omission)~
Time[total]:645.52906394[sec]
$python plain.py
~(Omission)~
Time:112.966698885[sec]

Hmm, it's about 6 times faster to implement normally. .. ..

Editor's Note

In order to realize the speed, it seems that it is necessary to prepare an environment for distributed processing or repeat trials of a large amount of data by machine learning. I felt that my next goal was to experience the power of spark on my skin.

reference

--SparkContext memo (http://www.ne.jp/asahi/hishidama/home/tech/scala/spark/SparkContext.html)

--Operating Apache Spark with Python (pyspark) (http://symfoware.blog68.fc2.com/blog-entry-1188.html)

Recommended Posts

Extract zip code information using spark
Extract information using File :: Stat in Ruby
Zip code geocoding
Spark Dataframe Sample Code Collection
Calculate information gain using NLTK
Write Python code that applies type information at runtime using pydantic