GoSuda

Ridurre il Tempo di Risposta degli Endpoint in Go - Utilizzando una Coda di Lavoro (Work Queue)

By Yunjin Lee
views ...

개요

Quando si impara Go per la prima volta, spesso si impara implementando un server di backend. In questo caso, consideriamo l'esempio della creazione di un codice di esempio che riceve uno stream di file da una RestAPI o simile e lo carica sul server. Il server net/http del linguaggio Go gestisce di base più richieste contemporaneamente, quindi l'upload simultaneo in sé non è un problema. Tuttavia, se tutti i processi successivi alla ricezione dello stream vengono gestiti in modo sincrono, la risposta dell'endpoint viene ritardata. Scopriamo una tecnica per prevenire tale situazione.

원인

Anche la ricezione dello stream richiede generalmente molto tempo, e nel caso di file di grandi dimensioni, una singola richiesta può essere elaborata per diversi minuti. In questo caso, è cruciale elaborare le operazioni successive alla ricezione il più rapidamente possibile. Questo scenario di esempio prevede la ricezione dello stream, il salvataggio come file temporaneo e il push nel container. In questo frangente, se la parte relativa al push del file temporaneo nel container viene gestita con un "worker pool", è possibile ridurre il ritardo della risposta.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Crea un percorso di file temporaneo univoco sull'host (Create unique temporary file path on host)
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Crea e copia il corpo della richiesta nel file temporaneo (sincrono) (Create and copy request body to temporary file (synchronous))
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Incolonna il task per il push Incus asincrono (Enqueue task for asynchronous Incus push)
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //QUESTA PARTE (THIS PART)
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Invia una risposta immediata 202 Accepted (Send immediate 202 Accepted response)
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

Noterete che la parte contrassegnata come "THIS PART" (QUESTA PARTE) inserisce l'attività in una coda.

Vediamo ora come funziona la coda di attività.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

La coda di attività fornita come esempio è implementata in modo semplice. Questa coda di attività ha una struttura semplice che estrae le attività in coda dal "channel".

Di seguito è riportato il metodo "worker" per il push nel container dopo l'upload del file. Attualmente, il metodo è un ciclo infinito per garantire una buona reattività e una facile implementazione, ma è possibile aggiungere algoritmi in base allo scopo.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Posticipa la pulizia del file temporaneo (Defer cleanup of the temporary file)
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Elabora l'attività con tentativi per errori Incus transitori (Process task with retries for transient Incus errors)
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separate upload task
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Esempio di errore permanente (Example permanent error)
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

In primo luogo, questa funzione continua a ciclare e tenta di recuperare un "task" dalla coda. Successivamente, tenta la fase di "upload" dal file temporaneo al container, anziché dallo stream al file temporaneo, entro l'ambito dei tentativi.

이점

Il vantaggio di questo metodo di elaborazione è che, se l'upload dello stream è riuscito, è possibile ridurre il tempo di latenza per le operazioni successive ed è possibile prevenire l'esaurimento delle risorse causato da operazioni simultanee sui container. Come si può notare dal codice attuale, il numero di operazioni simultanee sui container è limitato dal numero di "channel". Abbiamo esaminato un esempio pratico di come utilizzare l'elaborazione parallela di Go. Se si desiderano visualizzare ulteriori esempi, si prega di visitare i seguenti link.Modulo contenente esempi Progetto che utilizza gli esempi Poiché il progetto stesso contiene molti componenti accessori, per apprendere il "worker" in sé, è sufficiente dare un breve sguardo a come viene chiamata la funzione di inizializzazione del "worker" in main.go. Il modulo include anche altre forme di "worker", quindi si prega di farvi riferimento.