I can't get rid of the habit of producing useless things. This time, I wrote an article about the code of the line counter that was uselessly multithreaded.
Some files are faster than wc -l, which may be useful for those who want to count the lines fastest.
Go is self-taught. We look forward to your suggestions regarding mistakes and better writing.
"Parallel processing in Go language" (Why didn't you read it earlier?) Rubbing io.Reader (Tell me about Go's hidden?
Counts the number of lines in the file specified by the argument and displays it on the screen.
The following values can be specified in the argument -File name (-f) -Number of divisions for split reading (-s) -Number of threads for parallel processing (-t) -Read buffer size (-b)
-Count the number of lines by multi-thread processing using goroutine and channels -Rubbing the file with io.Reader (Rubbing io.Reader) -Shift the read start position with Seek and uselessly divide and read the file. ・ It is eco-friendly because it searches the read buffer directly.
The general processing flow is as follows.
The whole code is here
Import the required libraries. Use flag to receive arguments and glog for logging.
package main
import (
"bytes"
"flag"
"fmt"
"io"
"math"
"os"
"sync"
"time"
"github.com/golang/glog"
)
Declare a variable by defining a structure to receive arguments.
// Arg is struct for commandline arg.
type Arg struct {
//File to be processed
targetFile string
//Number of divisions when counting divisions
splitNum int
//Threads running concurrently(Maximum)number
maxThreads int
//Size of Buffer for reading files
buffersize int
}
var (
arg Arg
)
Write the argument expansion process etc. in the init function.
Since glog takes its own arguments, we use flag to redefine and initialize glog's own arguments. When the command help is displayed, the explanation of the argument of glog is also displayed and it is confusing, so I put "(go-lc)" in the argument explanation of this process to distinguish it (this area is smarter). There seems to be a way to write it).
func init() {
//Set help message
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "%s\n", fmt.Sprintf("%s -f TARGETFILE [options] [glog options]",
os.Args[0]))
flag.PrintDefaults()
}
//Initial setting of logger
_ = flag.Set("stderrthreshold", "INFO")
_ = flag.Set("v", "0")
//Setting command line options
flag.StringVar(&arg.targetFile, "f", "", "(go-lc) Target File.")
flag.IntVar(&arg.splitNum, "s", 2, "(go-lc) Num of File split.")
flag.IntVar(&arg.maxThreads, "t", 2, "(go-lc) Max Num of Threads.")
flag.IntVar(&arg.buffersize, "b", 1024*1024, "(go-lc) Size of ReadBuffer(default=1024*1024).")
}
Command help in this configuration is displayed as follows: The help of glog is quite noisy.
(It's noisy, but it's easy for a little tool because you don't have to implement the log level change process yourself.)
> ./bin/go-lc --help
./bin/go-lc -f TARGETFILE [options] [glog options]
-alsologtostderr
log to standard error as well as files
-b int
(go-lc) Size of ReadBuffer (default 1048576)
-f string
(go-lc) Target File
-log_backtrace_at value
when logging hits line file:N, emit a stack trace
-log_dir string
If non-empty, write log files in this directory
-logtostderr
log to standard error instead of files
-s int
(go-lc) Num of File split (default 2)
-stderrthreshold value
logs at or above this threshold go to stderr
-t int
(go-lc) Max Num of Threads (default 2)
-v value
log level for V logs
-vmodule value
comma-separated list of pattern=N settings for file-filtered logging
I will explain the main process first. That said, it just calls the getNumOfLines function to aggregate, receives the result, and displays it on the screen.
func main() {
flag.Parse()
glog.V(1).Infof("Start")
//Start timer for processing time calculation
startTime := time.Now()
//Execute aggregation processing
numOfLines, _ := getNumOfLines(arg.targetFile, arg.splitNum, arg.maxThreads, arg.buffersize)
//Show processing time
glog.V(1).Infof("End(%s)", time.Since(startTime))
fmt.Printf("%d\n", numOfLines)
}
getFileSize()
A function to get the file size. See comments for processing.
func getFileSize(filename string) (int, error) {
//Open the target file
fh, err := os.OpenFile(filename, 0, 0)
if err != nil {
return 0, err
}
defer fh.Close()
//Get file information
fileinfo, err := fh.Stat()
if err != nil {
return 0, err
}
//Gets and returns the number of bytes in the file
return int(fileinfo.Size()), nil
}
getNumOfLines()
This is getNumOfLines () that I was reading in main. It's a little long function, so I'll explain it separately.
The first part calculates the number of buffer reads performed for the entire file. The following formula.
*** Number of reads = File size / Buffer size (+1 if not divisible) ***
func getNumOfLines(filename string, splitNum int, maxThreads int, buffersize int) (int, error) {
//Get file size
fsize, err := getFileSize(filename)
if err != nil {
return 0, err
}
// loglevel =Information display with 1
glog.V(1).Infof("FileSize : %10d byte", fsize)
glog.V(1).Infof("Read buffer: %10d byte", buffersize)
glog.V(1).Infof("Max Threads: %d", maxThreads)
glog.V(1).Infof("Split Num : %d", splitNum)
//Calculate how many times it can be read in units of buffersize.
var readCountTotal int = int(math.Trunc(float64(fsize) / float64(buffersize)))
//If there is a remainder, add 1 to the number of reads
if fsize-(readCountTotal*buffersize) > 0 {
readCountTotal++
}
Next, we will set up the multithreading. Waitgroup may not be necessary if you devise how to use the channel, but it is used for the sake of clarity.
//Initialize the end waiting group
wg := &sync.WaitGroup{}
//Channel to limit the number of concurrent executions of goroutine
jc := make(chan interface{}, maxThreads)
defer close(jc)
//Channel to receive the row count result of each goroutine
counterCh := make(chan int, maxThreads)
//From the end standby goroutine of each goroutine,
//Channel for returning aggregated results to main processing
resultCh := make(chan int)
defer close(resultCh)
//Start goroutine for receiving results
//The end condition is close(counterCh)
go func(counterCh <-chan int) {
cAll := 0
for c := range counterCh {
cAll += c
glog.V(2).Infof("[receiver] receive: %d\n", c)
}
resultCh <- cAll
}(counterCh)
This is a loop that starts goroutine (countWorker) that counts the number of lines. byteOffset is the read start position. EachReadCount is the number of buffer reads, but the calculation method is from the following site. Split integers as evenly as possible Some people are smart.
A channel called jc is used to control the number of simultaneous startups. It's a standard usage, but I think the following URL will be helpful. How to control the maximum number of Goroutine Limit the number of parallel processing according to the number of CPUs in Go language By the way, the above two sites use bool type and int type, but according to "Parallel processing by Go language (O'Reilly)", "Interface with capacity 0 is good for the channel for controlling the number of startups!" That is.
//Row count count Start position to pass to goroutine(0 is#For one goroutine)
var byteOffset int64 = 0
//Row count loop to start goroutine
for i := 0; i < splitNum; i++ {
//How many times to read the buffer in countLinesInThread
eachReadCount := int(math.Trunc(float64(readCountTotal+i) / float64(splitNum)))
//Fill one goroutine invocation array
jc <- true
//Increase waitgroup by one
wg.Add(1)
//Start line count goroutine
go countWorker(filename, eachReadCount, byteOffset, buffersize, wg, jc, counterCh)
//Advance read offset
byteOffset += int64(eachReadCount * buffersize)
}
wg.Wait()
close(counterCh)
return <-resultCh, nil
}
countWorker()
Next, define countWorker (), which is the main body of goroutine. Actually, I just open the file given by filename, shift the read position with f.Seek, and then read getNumOfCharsOnIo. The part that opens the same file multiple times seems a bit redundant, but since there is a Seek process for split reading, I am doing this (Is it meaningful to split around here? I have a feeling, but I do not care In the adult development site, there are still cases where it is important to have the first intention.)
func countWorker(filename string, eachReadCount int, byteOffset int64, buffersize int,
wg *sync.WaitGroup, jc <-chan interface{}, counter chan<- int) {
var c int = 0
defer func() {
//Anonymous functions can access variables in outer scope, so there is no problem
counter <- c
wg.Done()
<-jc
}()
// loglevel=Information display with 2
glog.V(2).Infof("[countWorker] start (offset: %d, read size: %d)\n", byteOffset, eachReadCount*buffersize)
//Reopen the target file
//Seek's read cursor gets messed up when using the original file handler
f, err := os.OpenFile(filename, 0, 0)
if err != nil {
return
}
defer f.Close()
//Move to the specified read start position
_, err = f.Seek(byteOffset, 0)
if err != nil {
return
}
//You can also pass a bufio created as follows to getNumOfCharsOnIo
//This time io.Since the size of the data read from Reader can be changed, it is not used because there is little merit.
// br := bufio.NewReaderSize(f, 1024*1024)
c, err = getNumOfCharsOnIo(f, buffersize, eachReadCount)
if err != nil {
panic(err)
}
}
getNumOfCharsOnIo()
Finally, define getNumOfCharsOnIo () which is actually counting the number of rows. The process is similar to the famous article "io.Reader Sukore", but bytes.IndexByte () is added to the part that counts line breaks. I'm using. I think it's okay considering UTF-8 mechanism, but it may be safer to use Index Rune which is commented out. Hmm. It was the same when compared with the result of wc in some real files, but there may be cases where the result is different.
// io.Read buffersize from Reader and repeat the process of counting the number of occurrences of targetStr repeatCount times.
func getNumOfCharsOnIo(r io.Reader, buffersize int, repeatCount int) (int, error) {
//Initialize read buffer
buf := make([]byte, buffersize)
//Variable to store the number of rows
var c int = 0
//From the start position, read the byte string by buffersize and assign it to buf
for j := 0; j < repeatCount; j++ {
n, err := r.Read(buf)
//If the read size is 0
if n == 0 {
return c, err
}
//Processing at the time of Read error
if err != nil {
return c, err
}
//Offset for scanning the contents of the Buffer
of := 0
//Processing to count line breaks in Buffer
for {
//The size is specified by n because buf is used around.
// index := bytes.IndexRune(buf[of:n], rune('\n'))
index := bytes.IndexByte(buf[of:n], '\n')
//After that, exit the loop with no line breaks
if index == -1 {
break
}
// (Line breaks)Increment the counter
c++
//Discovery position+Advance offset to 1
of += index + 1
}
}
return c, nil
}
Most of the time, I left it to in-line comments, and it was a rough explanation, but that's all for the code.
The whole is here
Since it is a program that takes options, I will give a simple example of how to use it.
> ./bin/go-lc --help
./bin/go-lc -f TARGETFILE [options] [glog options]
-alsologtostderr
log to standard error as well as files
-b int
(go-lc) Size of ReadBuffer (default 1048576)
-f string
(go-lc) Target File
-log_backtrace_at value
when logging hits line file:N, emit a stack trace
-log_dir string
If non-empty, write log files in this directory
-logtostderr
log to standard error instead of files
-s int
(go-lc) Num of File split (default 2)
-stderrthreshold value
logs at or above this threshold go to stderr
-t int
(go-lc) Max Num of Threads (default 2)
-v value
log level for V logs
-vmodule value
comma-separated list of pattern=N settings for file-filtered logging
> ./bin/go-lc -f /mnt/v01/resource/wikipedia/jawiki/20191001/extract/jawiki-20191001-categorylinks.sql
1428
By default, it is divided into two and executed in two threads.
> ./bin/go-lc -f /mnt/v01/resource/wikipedia/jawiki/20191001/extract/jawiki-20191001-categorylinks.sql -v 5
I1216 20:47:23.781456 12091 main.go:233] Start
I1216 20:47:23.781785 12091 main.go:79] FileSize : 1426087753 byte
I1216 20:47:23.781801 12091 main.go:80] Read buffer: 1048576 byte
I1216 20:47:23.781816 12091 main.go:81] Max Threads: 2
I1216 20:47:23.781826 12091 main.go:82] Split Num : 2
I1216 20:47:23.781871 12091 main.go:160] [countWorker] start (offset: 713031680, read size: 714080256)
I1216 20:47:23.781953 12091 main.go:160] [countWorker] start (offset: 0, read size: 713031680)
I1216 20:47:23.957093 12091 main.go:115] [receiver] receive: 699
I1216 20:47:23.969989 12091 main.go:115] [receiver] receive: 729
I1216 20:47:23.970048 12091 main.go:242] End(188.280638ms)
1428
It processes a file of about 1.4GB in 0.188 seconds.
> ./bin/go-lc -f /mnt/v01/resource/wikipedia/jawiki/20191001/extract/jawiki-20191001-categorylinks.sql -v 5 -s 4 -t 4
I1216 20:51:51.827208 13285 main.go:233] Start
I1216 20:51:51.827519 13285 main.go:79] FileSize : 1426087753 byte
I1216 20:51:51.827534 13285 main.go:80] Read buffer: 1048576 byte
I1216 20:51:51.827553 13285 main.go:81] Max Threads: 4
I1216 20:51:51.827565 13285 main.go:82] Split Num : 4
I1216 20:51:51.827607 13285 main.go:160] [countWorker] start (offset: 1069547520, read size: 357564416)
I1216 20:51:51.827706 13285 main.go:160] [countWorker] start (offset: 713031680, read size: 356515840)
I1216 20:51:51.827646 13285 main.go:160] [countWorker] start (offset: 356515840, read size: 356515840)
I1216 20:51:51.827642 13285 main.go:160] [countWorker] start (offset: 0, read size: 356515840)
I1216 20:51:51.938578 13285 main.go:115] [receiver] receive: 343
I1216 20:51:51.939430 13285 main.go:115] [receiver] receive: 356
I1216 20:51:51.952839 13285 main.go:115] [receiver] receive: 386
I1216 20:51:51.956868 13285 main.go:115] [receiver] receive: 343
I1216 20:51:51.956899 13285 main.go:242] End(129.400448ms)
1428
With the previous file, it's faster to 0.129 seconds.
> ./bin/go-lc -f /mnt/v01/resource/wikipedia/jawiki/20191001/extract/jawiki-20191001-categorylinks.sql -v 5 -s 4 -t 4 -
b 1024
I1216 20:53:02.522702 13459 main.go:233] Start
I1216 20:53:02.523194 13459 main.go:79] FileSize : 1426087753 byte
I1216 20:53:02.523217 13459 main.go:80] Read buffer: 1024 byte
I1216 20:53:02.523222 13459 main.go:81] Max Threads: 4
I1216 20:53:02.523229 13459 main.go:82] Split Num : 4
I1216 20:53:02.523275 13459 main.go:160] [countWorker] start (offset: 1069565952, read size: 356521984)
I1216 20:53:02.523351 13459 main.go:160] [countWorker] start (offset: 0, read size: 356521984)
I1216 20:53:02.523442 13459 main.go:160] [countWorker] start (offset: 356521984, read size: 356521984)
I1216 20:53:02.526218 13459 main.go:160] [countWorker] start (offset: 713043968, read size: 356521984)
I1216 20:53:03.146721 13459 main.go:115] [receiver] receive: 343
I1216 20:53:03.149466 13459 main.go:115] [receiver] receive: 386
I1216 20:53:03.186216 13459 main.go:115] [receiver] receive: 356
I1216 20:53:03.190404 13459 main.go:115] [receiver] receive: 343
I1216 20:53:03.190443 13459 main.go:242] End(667.278999ms)
1428
It was slowed down to 0.667 seconds due to the smaller buffer and inefficiency.
By the way, I think that some people are worried about the result of writing such useless processing and how fast it will be. Looking at the usage example, the result of the buffer size 1048576 (1024 * 1024) of 4 divisions and 4 threads is good.
The result of the wc -l command for the same file is
time wc -l /mnt/v01/resource/wikipedia/jawiki/20191001/extract/jawiki-20191001-categorylinks.sql
(First time)
1428 /mnt/v01/resource/wikipedia/jawiki/20191001/extract/jawiki-20191001-categorylinks.sql
0.04user 0.26system 0:00.30elapsed 100%CPU (0avgtext+0avgdata 2104maxresident)k
0inputs+0outputs (0major+76minor)pagefaults 0swaps
(Second time)
time wc -l /mnt/v01/resource/wikipedia/jawiki/20191001/extract/jawiki-20191001-categorylinks.sql
1428 /mnt/v01/resource/wikipedia/jawiki/20191001/extract/jawiki-20191001-categorylinks.sql
0.03user 0.22system 0:00.26elapsed 99%CPU (0avgtext+0avgdata 2068maxresident)k
0inputs+0outputs (0major+75minor)pagefaults 0swaps
(Third time)
1428 /mnt/v01/resource/wikipedia/jawiki/20191001/extract/jawiki-20191001-categorylinks.sql
0.03user 0.22system 0:00.26elapsed 99%CPU (0avgtext+0avgdata 2124maxresident)k
0inputs+0outputs (0major+75minor)pagefaults 0swaps
(4th)
1428 /mnt/v01/resource/wikipedia/jawiki/20191001/extract/jawiki-20191001-categorylinks.sql
0.04user 0.20system 0:00.25elapsed 99%CPU (0avgtext+0avgdata 2104maxresident)k
0inputs+0outputs (0major+78minor)pagefaults 0swaps
(5th time)
1428 /mnt/v01/resource/wikipedia/jawiki/20191001/extract/jawiki-20191001-categorylinks.sql
0.04user 0.23system 0:00.27elapsed 100%CPU (0avgtext+0avgdata 2068maxresident)k
0inputs+0outputs (0major+75minor)pagefaults 0swaps
average: 0.268s
The average of 5 times was 0.268 seconds.
The machine environment is all Google Cloud Platform Compute Engine (n1-standard-4 (vCPU x 4, memory 15 GB)), and the target file is a 1.4 GB Wikidump sql file.
It took 0.129 seconds in 4 divisions, which is better than the wc -l command. It was worth the waste.
As a matter of fact, the above result is likely to be due to GCE's file cache. When I tried it with another file of 20GB, it took about 3 minutes as follows. This size is also faster than wc, but it seems that it may be faster because it processes 1.4GB in 0.3 seconds. So files of a few GB may be cached. In other words, there is a risk that there will be a difference in efficiency before and after caching. And because GCE has disks mounted over the network, speed also depends on network conditions. Please see the above results for reference only.
The results of executing wc -l and this program for a file of about 20GB are listed.
> time wc -l /mnt/v01/resource/wikipedia/enwiki/20191001/extract/enwiki-20191001-categorylinks.sql
(wc 1st time)
19525 /mnt/v01/resource/wikipedia/enwiki/20191001/extract/enwiki-20191001-categorylinks.sql
0.91user 8.21system 3:58.40elapsed 3%CPU (0avgtext+0avgdata 2116maxresident)k
31893064inputs+0outputs (0major+75minor)pagefaults 0swaps
(wc second time)
19525 /mnt/v01/resource/wikipedia/enwiki/20191001/extract/enwiki-20191001-categorylinks.sql
0.86user 7.86system 3:09.32elapsed 4%CPU (0avgtext+0avgdata 2044maxresident)k
26220800inputs+0outputs (0major+77minor)pagefaults 0swaps
(go-lc 1st time)
bin/go-lc -f /mnt/v01/resource/wikipedia/enwiki/20191001/extract/enwiki-20191001-categorylinks.sql -v 5 -t 4 -s 4
I1216 21:19:05.381636 14202 main.go:233] Start
I1216 21:19:05.384584 14202 main.go:79] FileSize : 20234931295 byte
I1216 21:19:05.384601 14202 main.go:80] Read buffer: 1048576 byte
I1216 21:19:05.384605 14202 main.go:81] Max Threads: 4
I1216 21:19:05.384609 14202 main.go:82] Split Num : 4
I1216 21:19:05.384704 14202 main.go:160] [countWorker] start (offset: 15176040448, read size: 5059379200)
I1216 21:19:05.384733 14202 main.go:160] [countWorker] start (offset: 0, read size: 5058330624)
I1216 21:19:05.384786 14202 main.go:160] [countWorker] start (offset: 5058330624, read size: 5058330624)
I1216 21:19:05.384859 14202 main.go:160] [countWorker] start (offset: 10116661248, read size: 5059379200)
I1216 21:19:06.836037 14202 main.go:115] [receiver] receive: 4881
I1216 21:20:49.994339 14202 main.go:115] [receiver] receive: 4866
I1216 21:20:56.968630 14202 main.go:115] [receiver] receive: 4910
I1216 21:21:00.825423 14202 main.go:115] [receiver] receive: 4868
I1216 21:21:00.825466 14202 main.go:242] End(1m55.440902834s)
19525
(go-lc 2nd time)
bin/go-lc -f /mnt/v01/resource/wikipedia/enwiki/20191001/extract/enwiki-20191001-categorylinks.sql -v 5 -t 4 -s 4
I1216 21:21:19.065087 14343 main.go:233] Start
I1216 21:21:19.066146 14343 main.go:79] FileSize : 20234931295 byte
I1216 21:21:19.066164 14343 main.go:80] Read buffer: 1048576 byte
I1216 21:21:19.066169 14343 main.go:81] Max Threads: 4
I1216 21:21:19.066182 14343 main.go:82] Split Num : 4
I1216 21:21:19.066232 14343 main.go:160] [countWorker] start (offset: 15176040448, read size: 5059379200)
I1216 21:21:19.066234 14343 main.go:160] [countWorker] start (offset: 0, read size: 5058330624)
I1216 21:21:19.066314 14343 main.go:160] [countWorker] start (offset: 5058330624, read size: 5058330624)
I1216 21:21:19.066377 14343 main.go:160] [countWorker] start (offset: 10116661248, read size: 5059379200)
I1216 21:21:20.477393 14343 main.go:115] [receiver] receive: 4881
I1216 21:23:04.790516 14343 main.go:115] [receiver] receive: 4910
I1216 21:23:35.783612 14343 main.go:115] [receiver] receive: 4868
I1216 21:23:53.859878 14343 main.go:115] [receiver] receive: 4866
I1216 21:23:53.859920 14343 main.go:242] End(2m34.793812658s)
19525
This process was also created as a test to process a large text file like Wikidump at high speed without loading the whole.
Since I got the read handler separately, I tried mounting 4 disks on GCE, copying the same file, and reading with disk distribution, but I expected that the communication speed with NAS is limited. The effect was not obtained (it was almost the same speed as when reading separately with one unit). GCS, which is a storage service of GCP, supports read in byte units, or it may be faster to perform split read for GCS. This area seems to have the advantage of io.Reader, or the advantage of Go that allows you to reuse the code by reading from a file and via the GCS API.
Thank you for staying with us until the end of the article that seems to be redeveloping the wheels in a complicated manner.
Recommended Posts