GoSuda

Réduire le temps de réponse des endpoints en Go – Utilisation des files d'attente de tâches

By Yunjin Lee
views ...

Aperçu

Lors de l'apprentissage initial de Go, il est fréquent d'implémenter un serveur backend pour s'exercer. Prenons l'exemple de la création d'un exemple où un serveur reçoit un flux de fichiers via une API REST ou similaire pour les télécharger. Le serveur net/http de Go gère par défaut plusieurs requêtes simultanément, il n'y a donc pas de problème avec les téléchargements concurrents eux-mêmes. Cependant, si toutes les opérations suivant la réception du flux sont traitées de manière synchrone, la réponse du point de terminaison sera retardée. Examinons une technique pour prévenir de telles situations.

Cause

La réception d'un flux prend généralement un temps considérable, et pour les fichiers volumineux, une seule requête peut être traitée pendant plusieurs minutes. Dans de tels cas, il est crucial de traiter les opérations post-réception aussi rapidement que possible. Ce scénario d'exemple implique la réception d'un flux, son stockage en tant que fichier temporaire, puis son transfert vers un conteneur. À ce stade, si la partie du transfert du fichier temporaire vers le conteneur est gérée par un pool de travailleurs, le délai de réponse peut être réduit.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Répertoire temporaire de l'hôte
14
15// UploadTask contient les données pour le transfert de fichier asynchrone.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler traite les téléchargements de fichiers. Sauvegarde dans un fichier temporaire, puis met en file d'attente pour le transfert Incus.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Créer un chemin de fichier temporaire unique sur l'hôte
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Créer et copier le corps de la requête dans un fichier temporaire (synchrone)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Mettre en file d'attente la tâche pour le transfert Incus asynchrone
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //CETTE PARTIE
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Envoyer une réponse immédiate 202 Accepted
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

Vous avez probablement remarqué que la partie marquée "THIS PART" indique l'insertion de la tâche dans une file d'attente.

Voyons maintenant comment la file d'attente de tâches fonctionne.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initialise la file d'attente de tâches en mémoire.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask ajoute une UploadTask à la file d'attente.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask récupère une UploadTask de la file d'attente, bloquant si vide.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength renvoie la taille actuelle de la file d'attente.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

La file d'attente de tâches fournie en exemple est implémentée de manière simple. Cette file d'attente de tâches a une structure simple qui consiste à retirer les tâches du canal.

Voici la méthode de travailleur pour le transfert vers le conteneur après le téléchargement du fichier. La méthode actuelle est une boucle infinie pour une bonne réactivité et une implémentation facile, mais vous pouvez ajouter des algorithmes selon vos besoins.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Différer le nettoyage du fichier temporaire
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Traiter la tâche avec des tentatives pour les erreurs Incus transitoires
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //tâche de téléchargement séparée
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Exemple d'erreur permanente
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Tout d'abord, cette fonction boucle continuellement, tentant de récupérer des tâches de la file d'attente. Ensuite, elle tente l'étape de téléchargement du fichier temporaire vers le conteneur (plutôt que du flux vers le fichier temporaire) dans la plage de réessais.

Avantages

L'avantage de cette méthode de traitement est qu'elle peut réduire le délai pour les opérations ultérieures si le téléchargement du flux est réussi, et elle peut prévenir l'épuisement des ressources dû aux opérations simultanées sur les conteneurs. Comme le montre le code actuel, le nombre d'opérations simultanées sur les conteneurs est limité par le nombre de canaux. Nous avons examiné un exemple pratique d'utilisation du traitement parallèle de Go de cette manière. Si vous souhaitez voir plus d'exemples, veuillez visiter les liens ci-dessous.Module incluant des exemples Projet utilisant les exemples Le projet lui-même contient de nombreux composants auxiliaires, donc pour l'apprentissage du travailleur lui-même, il suffit de regarder brièvement comment la fonction d'initialisation du travailleur est appelée dans main.go. Le module inclut également d'autres types de travailleurs, veuillez donc vous y référer.