Fun with IO pipes and chunked file uploads in Go
profile
written by: Ryan SchachteJanuary 23rd, 2023

IO pipes provide a mechanism for redirecting data between processes or threads, enabling powerful communication in multi-threaded systems. In this article, we will explore using IO pipes to construct a concurrent uploader for cloud storage providers such as S3 or Cloudflare R2.

Intuition

Let’s say we want a program that grabs DNS records for a given domain and outputs IP addresses associated with a particular domain. We might hack together a simple program like the following.

📋 Click to copy
1// grab server IP addresses 2cmd := exec.Command("dig", "ryan-schachte.com", "+short") 3cmd.Run()

Running this program doesn’t output anything because we haven’t redirected the output to anything. This leads us to a larger question, how do we get output to show up?

When using shells like Bash, we have 3 options available when it comes to input and output data streams.

  • 0 - stdin
  • 1 - stdout
  • 2 - stderr

When sending streams of data to the stdout or stderr file descriptors, they are printed to the screen directly. However, we can also redirect output to a file if an error occurs or we can redirect output to a file for cases that aren’t errors. Let’s validate our intuition so far by writing a simple bash program.

📋 Click to copy
1#!/bin/bash 2 3input=$1 4 5if [ "$input" == "test_error" ]; then 6 echo "output" >&2 7 exit 1 8elif [ "$input" == "test_success" ]; then 9 echo "output" 10 exit 0 11else 12 echo "Input is neither test_error nor test_success" >&2 13 exit 1 14fi

Above, we take in user input and redirect errors to the stderr file descriptor. In the case of a success input, we redirect the output message to the stdout file descriptor. Remember, file descriptors are just integers representing open files (see above). Let’s run a couple sample commands and validate that this is working.

./fd_test.sh test_error >&2

Now, this isn’t too interesting because we just see what we expect, which is:

Error: Input is test_error

This is basically saying, take the output of the program and redirect that output to stderr which prints to the terminal by default.

But what happens if we instead tell it to take any output from stderr and redirect it to /dev/null like so:

./fd_test.sh test_error 2>/dev/null

Nothing! In this case, we take the contents and redirect it to /dev/null instead of stderr and as a result, we see nothing. Interesting.. Let’s run a couple more tests.

What happens if we take error data and redirect it to stdout and take success data and redirect it to /dev/null?

📋 Click to copy
1./fd_test.sh test_error 2>&1 1>/dev/null 2./fd_test.sh test_success 1>/dev/null 2>&1
  • Invocation 1:
    • Redirect errors to standard out and print the output
    • Redirect successes to /dev/null
  • Invocation 2:
    • Redirect success messages to /dev/null
    • Redirect errors to stdout

In the first call, we see the error because the program output was an error and we print it to stdout. However, in the second call, despite having a successful run, we see nothing. This is because stdout is getting redirect to /dev/null and errors are printing to stdout but there are no errors!

This is very useful for logging software, error handling and output filtering due to proper classification of program output.

Sample Application

Ok, getting back to the program we started with, which didn’t output anything to the console.

📋 Click to copy
1// grab server IP addresses 2cmd := exec.Command("dig", "ryan-schachte.com", "+short") 3cmd.Run()

Now that we know a bit more about file descriptors and output, let’s redirect this output to stdout.

📋 Click to copy
1// grab server IP addresses 2cmd := exec.Command("dig", "ryan-schachte.com", "+short") 3cmd.Stdout = os.Stdout 4cmd.Run()

In this case, we are just redirecting the output directly to the terminal, let’s give it a shot:

📋 Click to copy
1go run main.go 2 3104.18.25.143 4104.18.24.143

Neat! Now, that begs a larger question, does it always need to be the terminal stdout? What if I wanted to pipe it directly into a file?

📋 Click to copy
1// grab server IP addresses 2file, _ := os.OpenFile("ip_addresses.txt", os.O_CREATE|os.O_WRONLY, 0666) 3defer file.Close() 4 5cmd := exec.Command("dig", "ryan-schachte.com", "+short") 6cmd.Stdout = file 7cmd.Run()

Now, the only difference here is that I’m telling stdout to be the file instead of the terminal. Let’s rerun.

📋 Click to copy
1go run main.go

No output! This is expected, but where did the output go? Into the file ip_addresses.txt. We should be able to now cat a local file like so:

📋 Click to copy
1cat ip_addresses.txt 2 3104.18.25.143 4104.18.24.143

Let’s summarize what we know so far:

  • output is typically delegated to file descriptors like stdout and stderr from our program and help us classify if something went wrong or be able to sift through log statements.
  • we have a lot of options for redirecting output of our program and can even target files explicitly if we aren’t interested in printing them directly to the console.

Pipes

Now that we have a decent amount of knowledge regarding file descriptors and output redirection, let’s talk about pipes. In Go specifically, an io.Pipe is a type that allows for communication between two goroutines. The reason we call this a pipe is because the output of one goroutine “pipes” into the input of another.

A common use-case for this would be to capture the output of a command and wait for the results to print. Let’s extend our toy example.

📋 Click to copy
1reader, writer := io.Pipe() 2cmd := exec.Command("dig", "ryan-schachte.com", "+short") 3cmd.Stdout = writer 4 5go func() { 6 defer writer.Close() 7 cmd.Run() 8 time.Sleep(10 * time.Second) 9}() 10 11output, _ := ioutil.ReadAll(reader) 12fmt.Println(string(output))

In this example, we kick off a goroutine immediately, which is non-blocking. This runs the command, which could take some variable amount of time. I intentionally added a sleep timer so we can evaluate the blocking behavior of the reads. In this case, we will continue writing things from the goroutine. Remember, this routine is separate from the calling routine, but for clarity, we are keeping them within the same file. The output won’t be printed to the console until the writer is closed. As you run this, you’ll see that we run the command, wait 10 seconds, close the writer, THEN output the IP addresses. The key takeaway in this example is that communication is happening between two goroutines executing concurrently, but despite their concurrent execution, they have some synchronous communication.

Beyond toy examples with concurrent file uploads and chunking

Toy examples are fun, but let’s dig into a real-world scenario where this could be useful. Let’s say I have a large file that I want to upload to Cloudflare R2. R2 is an object-storage provider similar to GCS or S3.

Given physical and virtual constraints, I can’t simply toss a massive 10GB video file into storage. Additionally, if I am fetching this file from one location and uploading to R2, I wouldn’t download 10GB and upload 10GB. To resolve these scenarios, it might make sense to invoke some range requests to the object resource or specify some pre-defined chunk-size. This gives me the ability to process micro-chunks of a file. In the face of a failure when uploading, I may have some logic like exponential backoff to reupload a failed file (see TUS to learn more about these types of uploads).

Architecture

We will kick off a goroutine that is solely responsible for uploading chunks of our file. This will run as some background routine that is invoked only once during the initialization of our program.

From the calling thread, we will continuously write chunks to our pipe, which the background routine has a pointer to. From here, chunks will incrementally get processed to R2 and be uploaded.

Defining our data

📋 Click to copy
1type S3WriteCloser struct { 2 Part int 3 WriteCloser io.WriteCloser 4} 5 6type S3Metadata struct { 7 BucketName string 8 Key string 9 ContentLength int 10 wg *sync.WaitGroup 11}

To keep things clear, we will encapsulate some metadata about our upload. The S3WriteCloser will have a reference to the writer, which writes the actual bytes to the pipe. Additionally, we will track the part number, which will be the number of unique chunks we send over the wire.

S3Metadata just stores some simple information like file size (useful for content-length HTTP header), upload path and a wait group. To prevent our program from terminating before the upload completes, we will synchronize the upload results to the main goroutine via a WaitGroup (this will make more sense soon).

Preparing our uploader and source object

We want to have a reference to the file we want to upload and then initialize the uploader responsible for interfacing with Cloudflare R2. Given that R2 replicates the AWS S3 API, we will use the official AWS V1 SDK for Go.

📋 Click to copy
1// Prevents early termination of the program while 2// background upload is processing 3var wg sync.WaitGroup 4wg.Add(1) 5 6// Source file that we want to chunk and upload 7file, err := os.Open(SAMPLE_FILE) 8if err != nil { 9 fmt.Println("Failed to open file", err) 10 return 11} 12 13// only necesary for PutObject requests that need content-length header 14stat, err := file.Stat() 15if err != nil { 16 fmt.Println("Failed to open file", err) 17 return 18} 19 20uploader := getUploader(ACCOUNT_ID, ACCESS_KEY, SECRET_KEY)

where getUploader simply constructs the S3 uploader.

📋 Click to copy
1// getUploader returns uploader instance to interface with R2 2func getUploader(accountId, accessKey, secretKey string) *s3manager.Uploader { 3 endpoint := fmt.Sprintf("https://%s.r2.cloudflarestorage.com", accountId) 4 sess, err := session.NewSession(&aws.Config{ 5 Region: aws.String("us-east-1"), 6 Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""), 7 Endpoint: &endpoint, 8 }) 9 if err != nil { 10 log.Fatal(err) 11 } 12 return s3manager.NewUploader(sess) 13}

Handling the writing

We will start with the writing before the reading. This will make sense in a moment. We now must create some reference to a pipe writer that our calling thread can push chunks into.

📋 Click to copy
1// writer will be invoked for each part of the file that we want 2// to upload in parallel 3func writer(uploader *s3manager.Uploader, metadata S3Metadata) *S3WriteCloser { 4 r, w := io.Pipe() 5 writeCloser := S3WriteCloser{ 6 Part: 1, 7 WriteCloser: w, 8 } 9 10 go func(uploader *s3manager.Uploader, reader *io.PipeReader, metadata S3Metadata) { 11 err := processUpload(uploader, r, metadata) 12 if err != nil { 13 log.Fatal("error uploading to R2", err) 14 } 15 fmt.Printf("Uploaded a total of %d parts\n", writeCloser.Part) 16 }(uploader, r, metadata) 17 18 return &writeCloser 19}

We have the uploader from the previous section, the metadata from the first section. We begin by initializing an io.Pipe(). This is cool because we get a reader and a writer.

As you see, I attach the writer to the writerCloser and return this. Because this pipe allows communication between threads, I can write data from anywhere and some other goroutine can retrieve those bytes.

Now, before returning, I immediately invoke the upload code. However, you may be wondering, how can we upload something if we haven’t written any data! Well, the simple answer is that the reader is blocking until we close the writer. Given that the writer is still open, this thread will wait for chunks and write them as they come in.

Let’s take a look at the processUpload call above.

📋 Click to copy
1// processUpload will handle chunks of multipart uploads and will complete 2// once the io writer is closed 3func processUpload( 4 uploader *s3manager.Uploader, 5 reader *io.PipeReader, 6 metadata S3Metadata, 7) error { 8 fmt.Println("Uploading to R2") 9 upload := func() error { 10 _, err := uploader.Upload(&s3manager.UploadInput{ 11 Bucket: aws.String(metadata.BucketName), 12 Key: aws.String(metadata.Key), 13 Body: aws.ReadSeekCloser(reader), 14 }) 15 if err == nil { 16 metadata.wg.Done() 17 } 18 return err 19 } 20 21 if err := upload(); err != nil { 22 fmt.Println(err) 23 return err 24 } 25 return nil 26}

Now, there looks like a lot is happening here but it’s pretty straight forward. The first thing we do is upload the object. However, notice that the body of the request is our pipe reader. This pipe reader is blocking until the writer is closed. Within the AWS SDK, chunked uploads will be happening in parallel using the MultiPartUpload functionality. Until the pipe writer is closed, this will keep waiting to receive more chunks.

Once we close the writer, we then decrement the wait group and program is free to terminate.

Chunking our file

As mentioned previously, uploading a 10GB file would never work in the real-world. As a result, we can process micro-chunks, let’s say 1mb of data. This is not only more suitable for resiliency, but reduces network congestion.

Now that we have our write logic complete, let’s incrementally push bytes of data to the pipe and have R2 upload them to Cloudflare.

Let’s show a full overview of our main function.

📋 Click to copy
1func main() { 2 var wg sync.WaitGroup 3 wg.Add(1) 4 file, err := os.Open(SAMPLE_FILE) 5 if err != nil { 6 fmt.Println("Failed to open file", err) 7 return 8 } 9 10 // only necesary for PutObject requests that need content-length header 11 stat, err := file.Stat() 12 if err != nil { 13 fmt.Println("Failed to open file", err) 14 return 15 } 16 17 uploader := getUploader(ACCOUNT_ID, ACCESS_KEY, SECRET_KEY) 18 closer := writer(uploader, S3Metadata{ 19 BucketName: BUCKET_NAME, 20 Key: SAMPLE_FILE, 21 ContentLength: int(stat.Size()), 22 wg: &wg, 23 }) 24 25 // simulate range requests or chunking here 26 buffer := make([]byte, 1024*1024) 27 for { 28 _, err = file.Read(buffer) 29 if err != nil { 30 if err == io.EOF { 31 break 32 } 33 fmt.Println("Error reading file:", err) 34 return 35 } 36 37 n, err := closer.WriteCloser.Write(buffer) 38 closer.Part++ 39 if err != nil { 40 log.Fatal(err) 41 } 42 fmt.Printf("Wrote %d mb\n", n/1024) 43 } 44 45 closer.WriteCloser.Close() 46 fmt.Println("Waiting for Upload to complete...") 47 wg.Wait() 48 fmt.Println("Upload Completed") 49}

As you see, I’ve created a memory buffer of 1MB. This means that I will send 1MB of my file in each upload request. Within the loop, I read sequential bytes of my file and push them to the pipe. The Part allows me to see how many chunks I’m actually processing.

Once the iteration of the file is complete, I’m free to close the writer. This then tells the reader it’s done receiving data and the upload can complete. Looking back at the upload logic, the wait group decrements and the program terminates successfully.

Complete Example

Click here for Github repo

📋 Click to copy
1package main 2 3import ( 4 "fmt" 5 "io" 6 "log" 7 "os" 8 "sync" 9 10 "github.com/aws/aws-sdk-go/aws" 11 "github.com/aws/aws-sdk-go/aws/credentials" 12 "github.com/aws/aws-sdk-go/aws/session" 13 "github.com/aws/aws-sdk-go/service/s3" 14 "github.com/aws/aws-sdk-go/service/s3/s3manager" 15) 16 17const ACCESS_KEY = "" 18const SECRET_KEY = "" 19const ACCOUNT_ID = "" 20const SAMPLE_FILE = "" 21const BUCKET_NAME = "" 22 23type S3WriteCloser struct { 24 Part int 25 WriteCloser io.WriteCloser 26} 27 28type S3Metadata struct { 29 BucketName string 30 Key string 31 ContentLength int 32 wg *sync.WaitGroup 33} 34 35func main() { 36 var wg sync.WaitGroup 37 wg.Add(1) 38 file, err := os.Open(SAMPLE_FILE) 39 if err != nil { 40 fmt.Println("Failed to open file", err) 41 return 42 } 43 44 // only necesary for PutObject requests that need content-length header 45 stat, err := file.Stat() 46 if err != nil { 47 fmt.Println("Failed to open file", err) 48 return 49 } 50 51 uploader := getUploader(ACCOUNT_ID, ACCESS_KEY, SECRET_KEY) 52 closer := writer(uploader, S3Metadata{ 53 BucketName: BUCKET_NAME, 54 Key: SAMPLE_FILE, 55 ContentLength: int(stat.Size()), 56 wg: &wg, 57 }) 58 59 // simulate range requests or chunking here 60 buffer := make([]byte, 1024*1024) 61 for { 62 _, err = file.Read(buffer) 63 if err != nil { 64 if err == io.EOF { 65 break 66 } 67 fmt.Println("Error reading file:", err) 68 return 69 } 70 71 n, err := closer.WriteCloser.Write(buffer) 72 closer.Part++ 73 if err != nil { 74 log.Fatal(err) 75 } 76 fmt.Printf("Wrote %d mb\n", n/1024) 77 } 78 79 closer.WriteCloser.Close() 80 fmt.Println("Waiting for Upload to complete...") 81 wg.Wait() 82 fmt.Println("Upload Completed") 83} 84 85// writer will be invoked for each part of the file that we want 86// to upload in parallel 87func writer(uploader *s3manager.Uploader, metadata S3Metadata) *S3WriteCloser { 88 r, w := io.Pipe() 89 writeCloser := S3WriteCloser{ 90 Part: 1, 91 WriteCloser: w, 92 } 93 94 go func(uploader *s3manager.Uploader, reader *io.PipeReader, metadata S3Metadata) { 95 err := processUpload(uploader, r, metadata) 96 if err != nil { 97 log.Fatal("error uploading to R2", err) 98 } 99 fmt.Printf("Uploaded a total of %d parts\n", writeCloser.Part) 100 }(uploader, r, metadata) 101 return &writeCloser 102} 103 104// processUpload will handle chunks of multipart uploads and will complete 105// once the io writer is closed 106func processUpload( 107 uploader *s3manager.Uploader, 108 reader *io.PipeReader, 109 metadata S3Metadata, 110) error { 111 fmt.Println("Uploading to AWS") 112 upload := func() error { 113 _, err := uploader.Upload(&s3manager.UploadInput{ 114 Bucket: aws.String(metadata.BucketName), 115 Key: aws.String(metadata.Key), 116 Body: aws.ReadSeekCloser(reader), 117 }) 118 if err == nil { 119 metadata.wg.Done() 120 } 121 return err 122 } 123 124 if err := handleBucketCreation(uploader, metadata.BucketName); err != nil { 125 fmt.Println(err) 126 return err 127 } 128 if err := upload(); err != nil { 129 fmt.Println(err) 130 return err 131 } 132 return nil 133} 134 135func handleBucketCreation(uploader *s3manager.Uploader, bucket string) error { 136 createBucketInput := &s3.CreateBucketInput{ 137 CreateBucketConfiguration: &s3.CreateBucketConfiguration{ 138 LocationConstraint: aws.String("hint:WNAM"), 139 }, 140 Bucket: aws.String(bucket), 141 } 142 _, err := uploader.S3.CreateBucket(createBucketInput) 143 return err 144} 145 146// getUploader returns uploader instance to interface with R2 147func getUploader(accountId, accessKey, secretKey string) *s3manager.Uploader { 148 endpoint := fmt.Sprintf("https://%s.r2.cloudflarestorage.com", accountId) 149 sess, err := session.NewSession(&aws.Config{ 150 Region: aws.String("us-east-1"), 151 Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""), 152 Endpoint: &endpoint, 153 }) 154 if err != nil { 155 log.Fatal(err) 156 } 157 return s3manager.NewUploader(sess) 158}
2023 - site designed, coded and hosted by
profile
Ryan SchachteSanta Barbara, CA