GoSuda

Réduire le temps de réponse des endpoints dans Go – Utilisation des files d'attente de tâches

By Yunjin Lee
views ...

Aperçu

Lors de l'apprentissage initial de Go, il est fréquent d'implémenter un serveur backend. Prenons l'exemple de la création d'un cas d'usage où des flux de fichiers sont reçus via une RestAPI, puis téléchargés vers le serveur. Le serveur net/http de Go gère nativement plusieurs requêtes simultanément, ce qui signifie que le téléchargement simultané en soi ne pose aucun problème. Cependant, si toutes les opérations suivant la réception du flux sont traitées de manière synchrone, la réponse du endpoint sera retardée. Examinons une technique permettant de prévenir une telle situation.

Cause

La réception d'un flux prend généralement beaucoup de temps, et pour les fichiers volumineux, une seule requête peut être traitée pendant plusieurs minutes. Dans de tels cas, il est crucial de traiter les opérations post-réception le plus rapidement possible. Ce scénario d'exemple consiste à recevoir un flux, à l'enregistrer en tant que fichier temporaire, puis à le pousser vers un conteneur. À ce stade, si la partie de l'envoi du fichier temporaire vers le conteneur est gérée par un "worker pool", le délai de réponse peut être réduit.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Répertoire temporaire de l'hôte
14
15// UploadTask contient les données pour la poussée asynchrone de fichiers.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler traite les téléchargements de fichiers. Enregistre dans un fichier temporaire, puis met en file d'attente pour la poussée Incus.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Crée un chemin de fichier temporaire unique sur l'hôte
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Crée et copie le corps de la requête dans un fichier temporaire (synchrone)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Met en file d'attente la tâche pour la poussée Incus asynchrone
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //CETTE PARTIE
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Envoie une réponse immédiate 202 Accepted
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

En observant la partie indiquée par "THIS PART", vous aurez remarqué que la tâche est insérée dans une file d'attente.

Examinons maintenant le fonctionnement de la file d'attente des tâches.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initialise la file d'attente de tâches en mémoire.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask ajoute une UploadTask à la file d'attente.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask récupère une UploadTask de la file d'attente, bloquant si vide.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength renvoie la taille actuelle de la file d'attente.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

La file d'attente des tâches fournie en exemple est implémentée de manière simple. Cette file d'attente a une structure rudimentaire qui extrait les tâches du canal.

Voici la méthode du "worker" pour pousser les fichiers vers le conteneur après le téléchargement. La méthode actuelle est une boucle infinie pour une bonne réactivité et une implémentation facile, mais il est possible d'ajouter des algorithmes en fonction de l'usage.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Reporte le nettoyage du fichier temporaire
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Traite la tâche avec des tentatives en cas d'erreurs Incus transitoires
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //tâche de téléchargement séparée
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Exemple d'erreur permanente
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Tout d'abord, cette fonction boucle continuellement pour tenter de récupérer des tâches de la file d'attente. Ensuite, elle tente l'étape de téléchargement du fichier temporaire vers le conteneur (plutôt que du flux vers le fichier temporaire) dans la limite des tentatives autorisées.

Avantages

L'avantage de cette méthode de traitement est qu'elle peut réduire le temps de latence pour les opérations ultérieures, à condition que le téléchargement du flux soit réussi, et prévenir l'épuisement des ressources dû à des opérations de conteneur simultanées. Comme le montre le code actuel, le nombre d'opérations de conteneur pouvant être exécutées simultanément est limité par la taille du canal. Nous avons ainsi examiné un exemple pratique d'utilisation du traitement parallèle de Go. Si vous souhaitez voir davantage d'exemples, veuillez visiter les liens ci-dessous.Module incluant des exemples Projet utilisant les exemples Le projet lui-même contient de nombreux composants auxiliaires ; pour l'apprentissage du "worker" en soi, il suffit de jeter un bref coup d'œil à la façon dont la fonction d'initialisation du "worker" est appelée dans main.go. Le module inclut également d'autres types de "workers", veuillez vous y référer.