Limit the number of goroutines and aggregate results

Alternative name of this topic: “Everything I have to know about channels in Go”. First of all I would like to provide an access to my repo with completed task for those who doesn’t want to dive into details: https://github.com/golovdanya/limited_goroutines
Task to resolve in details
There is a flow of input data on stdin (paths to local files and urls). Program should count the number of words “Go” in all sources and print it to stdout. The computing should be done with specified limited number of goroutines. If there are less data sources than specified number of goroutines, redundant ones should not be created.
Used approach
Let’s use one main goroutine to parse stdin and pass data to number of worker goroutines through the channel of written data (let’s call it “urls”):
urls := make(chan string)for scanner.Scan() {
// ... some code that will be described later
url := scanner.Text()
urls <- url
}
Obviously that main goroutine will be blocked after each send operation, so that it is impossible to read data from that channel in the same main goroutine. We will use specified number of “worker” goroutines to read data from “urls” channel and send results to “results”channel. Let’s describe the scheme of this approach:
maxWorkers := 3 // this is the number of workers (limited value)
workers := 0urls := make(chan string)
results := make(chan int)for scanner.Scan() {
// create worker goroutine only if input data exists
if workers < maxWorkers {
go worker(urls, results)
workers += 1
}
url := scanner.Text()
urls <- url
}
Let’s take a look at worker goroutine. It waits data in “urls”channel and then writes results into “results” channel:
func worker(urls chan string, results chan <- int) {
var data string
// get the following data from channel
for url := range urls {
// make some work to get number of "Go" words
// write the results into "results" channel
results <- number
}
// later we will notify that work of this goroutine is done
return
}
There worker goroutines will try to read from “urls” channel untill this channel is closed, so we have to do it manually as soon as all input data was sent into “urls” channel:
close(urls)
Somebody should read results from “results” channel. Worker goroutines will be blocked until some another goroutine read data from “results” channel. So let’s create one special goroutine, some kind of aggregator. It should be created and run at the start of program’s work. Let’s use anonymous goroutine which runs right after declaration:
go func() {
totalCount := 0
for res := range results {
totalCount += res
}
fmt.Println("Total: ", totalCount)
// later we will notify main goroutine that aggregation is done
}()
Also we do not want to continue work of main goroutine until all worker goroutines complete their work, so we should collect “done” statuses of all workers and get it in main goroutine:
func worker(urls chan string, results chan <- int, doneWorkers chan <- bool) {
var data string
// get the following data from channel
for url := range urls {
// make some work to get number of "Go" words
// write the results into "results" channel
results <- number
}
doneWorkers <- true
return
}func main() {
// ...
doneWorkers := make(chan bool)
for i := 0; i < workers; i++ {
<-doneWorkers
}
// ...
}
Work of agregation goroutine should be stoped the way described above:
close(results)
And finally we do not want to finish main goroutine until agregation goroutine complete it’s job, so let’s create another channel to store the status of this goroutine:
doneAnalyze := make(chan bool)go func() {
totalCount := 0
for res := range results {
totalCount += res
}
fmt.Println("Total: ", totalCount)
doneAnalyze <- true
}()// ...
<-doneAnalyze
Conclusion
Goroutines is perfect solution to build concurrent code in Go, and channels is incredible tool to make communications between goroutines. I expect that describer task could be resolved another simpler way, so feel free to comment. Thanks a lot