GoSuda

Minska Endpoint-svarstider i Go – Använd en Job Queue

By Yunjin Lee
views ...

Översikt

När man först lär sig Go, är det ofta man gör det genom att implementera en backend-server. Låt oss ta ett exempel där man skapar ett exempel för att ta emot en filström från RestAPI etc. och ladda upp den till servern. Go-språkets net/http-server hanterar i princip flera förfrågningar samtidigt, så det finns inga problem med samtidiga uppladdningar i sig. Men om alla operationer efter mottagandet av strömmen hanteras synkront, fördröjs svarstiden för endpointen. Låt oss undersöka en teknik för att förhindra denna situation.

Orsak

Det tar i allmänhet lång tid att ta emot en ström, och för stora filer kan en enskild förfrågan behandlas i flera minuter. I sådana fall är det viktigt att behandla operationer efter mottagandet så snabbt som möjligt. Detta exempelscenario är att ta emot en ström, spara den som en temporär fil och sedan pusha den till en container. Om delen som pushar den temporära filen till containern hanteras med en worker pool, kan svarstiden förkortas.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Skapa unik temporär filsökväg på hosten
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Skapa och kopiera request body till temporär fil (synkront)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Lägg till uppgift för asynkron Incus-push i kö
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //DENNA DEL
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Skicka omedelbart 202 Accepted svar
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

Du har förmodligen märkt att den del som är markerad med THIS PART indikerar att uppgiften läggs till i kön.

Låt oss nu se hur uppgiftskön fungerar.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initierar den in-memory uppgiftskön.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask lägger till en UploadTask i kön.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask hämtar en UploadTask från kön och blockerar om kön är tom.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength returnerar aktuell köstorlek.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

Den medföljande uppgiftskön är enkelt implementerad. Denna uppgiftskö har en enkel struktur som hämtar uppgifter från kanalen som finns i kön.

Nedan följer worker-metoden för att pusha filer till containern efter uppladdning. Den nuvarande metoden är en oändlig loop för god responsivitet och enkel implementering, men algoritmer kan läggas till beroende på syftet.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Skjut upp rensningen av den temporära filen
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Bearbeta uppgiften med återförsök för tillfälliga Incus-fel
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separat uppladdningsuppgift
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Exempel på permanent fel
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Först försöker denna funktion kontinuerligt att hämta uppgifter från kön genom en loop. Därefter försöker den ladda upp från den temporära filen till containern, snarare än från strömmen till den temporära filen, inom ramen för återförsök.

Fördelar

Fördelen med denna bearbetningsmetod är att den kan minska fördröjningstiden för efterföljande bearbetningsuppgifter om strömuppladdningen är normal, och den kan förhindra resursbrist på grund av samtidiga containeroperationer. Som det framgår av den nuvarande koden begränsas antalet samtidiga containeroperationer av antalet kanaler. På detta sätt har vi undersökt ett praktiskt exempel på hur man kan använda Gos parallella bearbetning. Om du vill se fler exempel, besök länkarna nedan.Modul inklusive exempel Projekt som använder exempel Själva projektet innehåller många sidokomponenter, så för att lära dig om själva workern behöver du bara kortfattat titta på hur worker init-funktionen anropas i main.go. Modulen innehåller även andra typer av workers, så tänk på det.