GoSuda

在 Go 中缩短端点响应时间——利用任务队列

By Yunjin Lee
views ...

概述

初次学习 Go 语言时,通常会通过实现后端服务器来学习。此时,我们以一个接收 RestAPI 等文件流并将其上传到服务器的示例为例。 Go 语言的 net/http 服务器默认可以同时处理多个请求,因此并发上传本身没有问题。但是,如果流接收后的所有操作都同步处理,则会延迟端点的响应。让我们探讨一种防止这种情况发生的技术。

原因

接收流通常需要较长时间,对于大文件,单个请求可能需要数分钟才能处理完毕。在这种情况下,尽快处理接收后的操作至关重要。此示例场景是接收流后保存为临时文件,然后推送到容器的场景。此时,如果将临时文件推送到容器的部分通过 Worker Pool 处理,则可以缩短响应延迟。

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Create unique temporary file path on host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Create and copy request body to temporary file (synchronous)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Enqueue task for asynchronous Incus push
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //THIS PART
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Send immediate 202 Accepted response
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

您可能已经注意到,在标记为 THIS PART 的部分,任务被插入到队列中。

现在让我们看看任务队列是如何工作的。

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

提供的任务队列示例实现简单。此任务队列具有从通道中取出队列中任务的简单结构。

以下是用于在文件上传后推送到容器的 Worker 方法。当前方法为了良好的响应性和简单的实现而采用无限循环,但可以根据需要添加算法。

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Defer cleanup of the temporary file
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Process task with retries for transient Incus errors
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separate upload task
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Example permanent error
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

首先,此函数会持续循环并尝试从队列中获取任务。之后,它会在重试范围内尝试从流-临时文件到临时文件-容器的上传阶段。

优点

这种处理方式的优点是,只要流上传正常,就可以减少后续操作的延迟时间,并防止因并发容器操作导致资源耗尽。正如当前代码所示,可以同时进行的容器操作数量受限于通道的数量。通过这种方式,我们了解了一个实用地使用 Go 并行处理的示例。如果想查看更多示例,请访问以下链接。包含示例的模块 利用示例的项目 项目本身包含许多辅助组件,因此对于 Worker 本身的学习,只需简要查看 main.go 中如何调用 Worker 初始化函数即可。该模块还包含其他形式的 Worker,敬请参考。