Concurrency-in-golang

强烈谴责, 坚决制裁

Coming from a PHP background, I instantly fell in love with Go after checking out the syntax and building small projects with it. What stood out most to me was the simplistic approach to lower level operations in Go, ranging from references and pointers to concurrency.

In this article, I will share my experience with concurrency with the aid of a small tool. The program fetches issues from the xkcd comics website and downloads each URL to build an offline JSON index. At the time of writing, there are over 2500 comics (URLs) to download.

Why concurrency?

Much has been written on the concurrency feature of Go so I’ll just share my experience on what I know it does for this project. As stated earlier, the xkcd website has over 2500 comics to download. To do this sequentially (that is, one at a time), it would take a long time (probably hours). If you happen to be very patient, there is still a very high chance the operation would fail due to factors such as the rate limiting feature on the website. It would not make any sense to download this resource sequentially (trust me, I tried).

By using a concurrent model, I was able to implement a Worker pool (to be explained later) to handle multiple HTTP requests at a time, keeping the connection alive and getting multiple results in a very short time.

What is this concurrent model? In Go, it is simply creating multiple goroutines to handle parts of the processes. A goroutine is Go’s way of achieving concurrency. They are functions that run concurrently with other functions. A goroutine can be compared to a lightweight thread (although it’s not a thread, as many goroutines can work on a single thread) which makes it lighter, faster and reliable. You can create as many as one million goroutines in one program. When two or more goroutines are running, they need a way to communicate with each other. That’s where channels come in.

To build this program, we will depend heavily on goroutines and channels, and to maintain the focus of this article, I will leave links below to explain these fundamental concepts better.

Planning and Design

The xkcd website features a JSON interface to allow external services use their API. We will be downloading the data from this interface to build our offline index.

Based on the above output, we can design our struct. This struct will be used as a model for what data we want to extract for JSON handling:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

type Result struct {
Month string `json:"month"`
Num int `json:"num"`
Link string `json:"link"`
Year string `json:"year"`
News string `json:"news"`
SafeTitle string `json:"safe_title"`
Transcript string `json:"transcript"`
Alt string `json:"alt"`
Img string `json:"img"`
Title string `json:"title"`
Day string `json:"day"`
}

Fetching the comic

Now, before we jump into concurrency, we want to establish a function that serves the core purpose of the application — fetching the comic. The function has to be independent of our architecture and give room for re-usability across the program. I’ll explain each step below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func fetch(n int) (*Result, error) {

client := &http.Client{
Timeout: 5 * time.Minute,
}

// concatenate strings to get url; ex: https://xkcd.com/571/info.0.json
url := strings.Join([]string{Url, fmt.Sprintf("%d", n), "info.0.json"}, "/")

req, err := http.NewRequest("GET", url, nil)

if err != nil {
return nil, fmt.Errorf("http request: %v", err)
}

resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("http err: %v", err)
}

var data Result

// error from web service, empty struct to avoid disruption of process
if resp.StatusCode != http.StatusOK {
data = Result{
}
} else {
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, fmt.Errorf("json err: %v", err)
}
}

resp.Body.Close()

return &data, nil
}

First we create a custom HTTP client and set timeout to 5 seconds. After joining the strings using the strings package, we create a new request and send it using the previously created client. If the request is successful, we decode the data from JSON into our local struct. Then we close the response body and return a pointer to the struct.

Confirm it works

So far we have implemented the core structure of the application. Let’s run this part to ensure our code works as expected. Here’s the complete code so far:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package main

import (
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"time"
)

type Result struct {
Month string `json:"month"`
Num int `json:"num"`
Link string `json:"link"`
Year string `json:"year"`
News string `json:"news"`
SafeTitle string `json:"safe_title"`
Transcript string `json:"transcript"`
Alt string `json:"alt"`
Img string `json:"img"`
Title string `json:"title"`
Day string `json:"day"`
}

const Url = "https://xkcd.com"


func fetch(n int) (*Result, error) {

client := &http.Client{
Timeout: 5 * time.Minute,
}

// concatenate strings to get url; ex: https://xkcd.com/571/info.0.json
url := strings.Join([]string{Url, fmt.Sprintf("%d", n), "info.0.json"}, "/")

req, err := http.NewRequest("GET", url, nil)

if err != nil {
return nil, fmt.Errorf("http request: %v", err)
}

resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("http err: %v", err)
}

var data Result

// error from web service, empty struct to avoid disruption of process
if resp.StatusCode != http.StatusOK {
data = Result{
}
} else {
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, fmt.Errorf("json err: %v", err)
}
}

resp.Body.Close()

return &data, nil
}

func main() {
n := 200
result, err := fetch(n)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%v\n", result.Title)
}

Expected output for the title is “Bill Nye”, which is the title for issue #200. You can change the issue number to verify further.

Channels Setup

As stated earlier, we will be creating a Worker pool to concurrently handle the operations. To do that, we have to set up buffered channels. A buffered channel is simply a channel with a specified capacity. With a buffered channel, send operations are blocked when the buffer is full and receive operations are blocked when the buffer is empty. We need this feature because in a Worker Pool, we assign multiple jobs to a number of workers and we want to ensure they are handled in an organized way. An example:

1
ch := make(chan int, 6)

If we have 6 workers in our worker pool, this buffered channel will ensure at every point in time, at most 6 jobs are given to the 6 workers.

1
2
3
4
5
6
7
8
9
10
var jobs = make(chan Job, 100)
var results = make(chan Result, 100)
var resultCollection []Result

func allocateJobs(noOfJobs int) {
for i := 0; i <= noOfJobs; i++ {
jobs <- Job{i+1}
}
close(jobs)
}

After creating the buffered channels and setting up the final results variable, we created a function to allocate jobs to the jobs channel. As expected, this function will block when i = 100, which means no new job will be added until a job has been received by the worker. After all available jobs have been allocated, the jobs channel will be closed to avoid further writes.

Create the Worker pool

A worker pool maintains multiple threads (or in our case, goroutines) and waits for tasks (jobs) to be assigned to them. For example, let’s say we have 1000 jobs. We create a worker pool which spawns 100 workers. If the jobs channel is buffered at 100-capacity, the workers takes in the 100 jobs, and as some jobs are done processing, new jobs are being allocated, which goes to the workers, and so on.

Our worker pool will make use of Go’s WaitGroup, a synchronization primitive (type) that tells the main goroutine to wait for a collection of goroutines to finish.

Here’s a simple implementation for this project:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func worker(wg *sync.WaitGroup) {
for job := range jobs {
result, err := fetch(job.number)
if err != nil {
log.Printf("error in fetching: %v\n", err)
}
results <- *result
}
wg.Done()
}

func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i <= noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}

In the code, we first define a worker function. The worker gets a job from the allocated jobs channel, processes the result, and passes the value to the results channel. In the createWorkerPool function, we use the WaitGroup primitive to set up a Worker pool. The wg.Add(1) call increments the WaitGroup counter. The counter must be zero if the program is to stop running (which is why we have the wg.Wait() call). The wg.Done() call in the worker function decrements the counter and if all is done, the control is returned to the main goroutine and the results channel is closed to prevent further writes.

Get the results

The results are being added to the results channel we created. However, it is buffered and can only accept 100 at a time. We need a seperate goroutine to retrieve the results and give room for other results. Here’s how we do that:

1
2
3
4
5
6
7
8
9
func getResults(done chan bool) {
for result := range results {
if result.Num != 0 {
fmt.Printf("Retrieving issue #%d\n", result.Num)
resultCollection = append(resultCollection, result)
}
}
done <- true
}

If the result from the results channel is valid, we append it to the results collection. We have a boolean channel named “done”; we will use it to check if all the results have been collated.

Putting it all together

We have a bunch of functions, variables and types declarations, but how do we put them together? Which function is executed first and why? In this last section, we will see how it all comes together.

Here’s the code for the main function:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func main() {
// allocate jobs
noOfJobs := 3000
go allocateJobs(noOfJobs)

// get results
done := make(chan bool)
go getResults(done)

// create worker pool
noOfWorkers := 100
createWorkerPool(noOfWorkers)

// wait for all results to be collected
<-done

// convert result collection to JSON
data, err := json.MarshalIndent(resultCollection, "", " ")
if err != nil {
log.Fatal("json err: ", err)
}

// write json data to file
err = writeToFile(data)
if err != nil {
log.Fatal(err)
}
}

func writeToFile(data []byte) error {
f, err := os.Create("xkcd.json")
if err != nil {
return err
}
defer f.Close()

_, err = f.Write(data)
if err != nil {
return err
}
return nil
}

First, we allocate jobs. We use 3000 because at the time of writing, xkcd has over 2500 comic issues, and we want to make sure we get all of them.

Exercise: Create a small program that tells you exactly how many issues are on the xkcd website, to remove the need for an estimate.

  • To allocate, we start a goroutine. Note that this goroutine will block once 100 jobs have been added to the channel. It will wait for another goroutine to read the jobs channel.

  • We start a goroutine to collect the results. Why do this now? Well, the results channel is currently empty. Trying to read data from it will block the routine, until data has been written to the channel.

  • That makes it 2 goroutines blocked and waiting for read and write operations.

  • We create the Worker pool. This spawns many workers (100 in our example) and they read from the jobs channel, and write to the results channel.

  • That begins to satisfy the 2 blocked goroutines we had earlier.

  • We get the value of the “done” boolean channel to ensure all results have been collected.

  • Then we convert to JSON and write the data to file.

Complete Code

Here’s a complete code for the project:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package main

import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strings"
"sync"
"time"
)

type Result struct {
Month string `json:"month"`
Num int `json:"num"`
Link string `json:"link"`
Year string `json:"year"`
News string `json:"news"`
SafeTitle string `json:"safe_title"`
Transcript string `json:"transcript"`
Alt string `json:"alt"`
Img string `json:"img"`
Title string `json:"title"`
Day string `json:"day"`
}
const Url = "https://xkcd.com"


func fetch(n int) (*Result, error) {

client := &http.Client{
Timeout: 5 * time.Minute,
}

// concatenate strings to get url; ex: https://xkcd.com/571/info.0.json
url := strings.Join([]string{Url, fmt.Sprintf("%d", n), "info.0.json"}, "/")

req, err := http.NewRequest("GET", url, nil)

if err != nil {
return nil, fmt.Errorf("http request: %v", err)
}

resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("http err: %v", err)
}

var data Result

// error from web service, empty struct to avoid disruption of process
if resp.StatusCode != http.StatusOK {
data = Result{
}
} else {
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, fmt.Errorf("json err: %v", err)
}
}

resp.Body.Close()

return &data, nil
}

type Job struct {
number int
}

var jobs = make(chan Job, 100)
var results = make(chan Result, 100)
var resultCollection []Result

func allocateJobs(noOfJobs int) {
for i := 0; i <= noOfJobs; i++ {
jobs <- Job{i+1}
}
close(jobs)
}

func worker(wg *sync.WaitGroup) {
for job := range jobs {
result, err := fetch(job.number)
if err != nil {
log.Printf("error in fetching: %v\n", err)
}
results <- *result
}
wg.Done()
}

func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i <= noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}

func getResults(done chan bool) {
for result := range results {
if result.Num != 0 {
fmt.Printf("Retrieving issue #%d\n", result.Num)
resultCollection = append(resultCollection, result)
}
}
done <- true
}

func main() {
// allocate jobs
noOfJobs := 3000
go allocateJobs(noOfJobs)

// get results
done := make(chan bool)
go getResults(done)

// create worker pool
noOfWorkers := 100
createWorkerPool(noOfWorkers)

// wait for all results to be collected
<-done

// convert result collection to JSON
data, err := json.MarshalIndent(resultCollection, "", " ")
if err != nil {
log.Fatal("json err: ", err)
}

// write json data to file
err = writeToFile(data)
if err != nil {
log.Fatal(err)
}
}

func writeToFile(data []byte) error {
f, err := os.Create("xkcd.json")
if err != nil {
return err
}
defer f.Close()

_, err = f.Write(data)
if err != nil {
return err
}
return nil
}

https://blog.devgenius.io/concurrency-with-sample-project-in-golang-297400beb0a4