Reducerea timpului de răspuns al endpoint-ului în Go - Utilizarea cozilor de lucru
Introducere
Când se învață Go pentru prima dată, este frecvent să se învețe prin implementarea unui server backend. Să luăm ca exemplu crearea unui exemplu în care un flux de fișiere este primit printr-un REST API și încărcat pe server. Serverul Go net/http gestionează în mod implicit multiple solicitări simultan, astfel încât încărcările concurente nu reprezintă o problemă în sine. Totuși, dacă toate operațiile ulterioare recepționării fluxului sunt procesate sincron, răspunsul endpoint-ului va fi întârziat. Să examinăm o tehnică pentru a preveni o astfel de situație.
Cauză
Recepționarea unui flux necesită, în general, un timp considerabil, iar în cazul fișierelor mari, o singură solicitare poate fi procesată timp de câteva minute. În astfel de cazuri, este crucial să se proceseze operațiile ulterioare recepționării cât mai rapid posibil. Acest scenariu exemplu implică recepționarea unui flux, salvarea acestuia ca fișier temporar și apoi împingerea în container. În acest moment, dacă partea de împingere a fișierului temporar în container este gestionată de un worker pool, întârzierea răspunsului poate fi redusă.
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Directorul temporar al gazdei
14
15// UploadTask conține date pentru împingerea asincronă a fișierului.
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandler procesează încărcările de fișiere. Salvează în fișier temporar, apoi pune în coadă pentru împingere Incus.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40 return
41 }
42
43 // Creează o cale unică pentru fișierul temporar pe gazdă
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49 http.Error(wr, "Server error.", http.StatusInternalServerError)
50 return
51 }
52
53 // Creează și copiază corpul solicitării în fișierul temporar (sincron)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("ERROR: Failed to create temporary file: %v", err)
57 http.Error(wr, "Server error.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67 http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72 // Pune în coadă sarcina pentru împingere Incus asincronă
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //ACEASTĂ PARTE
80 log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82 // Trimite un răspuns imediat 202 Accepted
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}
Dacă observați partea marcată cu "THIS PART", probabil ați realizat că sarcina este inserată într-o coadă.
Acum, să vedem cum funcționează coada de sarcini.
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueue inițializează coada de sarcini în memorie.
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Upload Info: Work queue initialized.")
16 })
17}
18
19// EnqueueTask adaugă un UploadTask în coadă.
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("ERROR: Task queue not initialized.")
23 }
24 taskQueue <- task
25 log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask recuperează un UploadTask din coadă, blocând dacă este goală.
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("ERROR: Task queue not initialized.")
32 }
33 task := <-taskQueue
34 log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLength returnează dimensiunea curentă a cozii.
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
Coada de sarcini furnizată ca exemplu este implementată simplu. Această coadă de sarcini are o structură simplă care extrage sarcinile din canal.
Mai jos este metoda worker pentru a împinge fișierul în container după încărcare. Metoda curentă este o buclă infinită pentru o bună responsivitate și o implementare ușoară, dar se poate adăuga un algoritm în funcție de scop.
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
5
6 // Amână curățarea fișierului temporar
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10 } else {
11 log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // Procesează sarcina cu reîncercări pentru erorile tranzitorii Incus
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //sarcină de încărcare separată
18 if err == nil {
19 log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // Exemplu de eroare permanentă
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
În primul rând, această funcție rulează continuu într-o buclă și încearcă să preia sarcini din coadă. Apoi, în intervalul de reîncercare, încearcă etapa de încărcare de la fișierul temporar la container, nu de la flux la fișierul temporar.
Avantaje
Avantajul acestei metode de procesare este că, dacă încărcarea fluxului este normală, timpul de întârziere pentru operațiile ulterioare poate fi redus și epuizarea resurselor cauzată de operațiile concurente ale containerului poate fi prevenită. După cum se poate observa în codul actual, numărul de operații concurente ale containerului este limitat de numărul de canale. Astfel, am examinat un exemplu practic de utilizare a procesării paralele în Go. Dacă doriți să vedeți mai multe exemple, vă rugăm să vizitați linkurile de mai jos.Modulul care conține exemplul Proiectul care utilizează exemplul Proiectul în sine conține multe componente auxiliare, așa că pentru a înțelege worker-ul în sine, este suficient să observați pe scurt cum este apelată funcția de inițializare a worker-ului în main.go. Modulul include, de asemenea, alte tipuri de worker-i, așa că vă rugăm să rețineți acest lucru.