GoSuda

Minskad svarstid för endpoints i Go – Använda Work Queues

By Yunjin Lee
views ...

Översikt

När man först lär sig Go, är det ofta genom att implementera en backend-server. Låt oss ta ett exempel där man skapar ett exempel för att ta emot en filström från RestAPI och ladda upp den till servern. Go-språkets net/http-server hanterar som standard flera förfrågningar samtidigt, så själva den samtidiga uppladdningen är inget problem. Men om alla operationer efter mottagandet av strömmen hanteras synkront, kommer svarstiden för endpointen att fördröjas. Låt oss undersöka en teknik för att förhindra en sådan situation.

Orsak

Mottagandet av en ström tar i allmänhet lång tid, och för stora filer kan en enskild förfrågan ta flera minuter att behandla. I sådana fall är det viktigt att snabbt hantera operationer efter mottagandet. Detta exempelscenario är att ta emot en ström, spara den som en temporär fil och sedan pusha den till en container. Om push-operationen av den temporära filen till containern hanteras av en worker pool i detta skede, kan svarstidsfördröjningen reduceras.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Create unique temporary file path on host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Create and copy request body to temporary file (synchronous)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Enqueue task for asynchronous Incus push
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //THIS PART
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Send immediate 202 Accepted response
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

Du har förmodligen märkt att delen markerad med "THIS PART" indikerar att en uppgift läggs till i kön.

Låt oss nu se hur uppgiftskön fungerar.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

Den tillhandahållna uppgiftskön är enkelt implementerad. Denna uppgiftskö har en enkel struktur som tar bort uppgifter från kanalen som finns i kön.

Nedan följer en worker-metod för att pusha filer till containern efter uppladdning. Den nuvarande metoden använder en oändlig loop för god responsivitet och enkel implementering, men du kan lägga till algoritmer beroende på syftet.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Defer cleanup of the temporary file
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Process task with retries for transient Incus errors
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separate upload task
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Example permanent error
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Först försöker denna funktion kontinuerligt att hämta uppgifter från kön i en loop. Därefter försöker den ladda upp från temporär fil till container, snarare än från ström till temporär fil, inom det angivna antalet försök.

Fördelar

Fördelen med denna bearbetningsmetod är att den kan minska fördröjningstiden för efterföljande uppgifter, förutsatt att strömuppladdningen är normal, och förhindra resursbrist på grund av samtidiga containeroperationer. Som framgår av den nuvarande koden är antalet samtidiga containeroperationer begränsat av antalet kanaler. Detta var ett praktiskt exempel på hur Go:s parallella bearbetning kan användas. Om du vill se fler exempel, besök länkarna nedan.Modul inklusive exempel Projekt som använder exemplen Själva projektet innehåller många sidoelement, så för att lära dig om workern behöver du bara kortfattat titta på hur worker init-funktionen anropas i main.go. Modulen innehåller också andra typer av workers, så titta gärna på dem.