GoSuda

Reducción del tiempo de respuesta del endpoint en Go: utilización de colas de tareas

By Yunjin Lee
views ...

Resumen

Cuando se aprende Go por primera vez, es frecuente que el aprendizaje se realice implementando un servidor de backend. Tomemos como ejemplo la creación de un caso de uso en el que se recibe un flujo de archivo de una RestAPI y se carga al servidor. El servidor net/http del lenguaje Go procesa múltiples solicitudes simultáneamente por defecto, por lo que la carga concurrente no presenta problemas en sí misma. Sin embargo, si todas las operaciones posteriores a la recepción del flujo se procesan de forma síncrona, la respuesta del endpoint se retrasa. A continuación, exploraremos una técnica para prevenir esta situación.

Causa

La recepción de un flujo generalmente requiere un tiempo considerable, y en el caso de archivos grandes, una sola solicitud puede tardar varios minutos en procesarse. En tales circunstancias, es crucial procesar las operaciones posteriores a la recepción de la manera más rápida posible. Este escenario de ejemplo implica la recepción de un flujo, su almacenamiento como archivo temporal y su posterior envío a un contenedor. Si la parte del envío del archivo temporal al contenedor se gestiona con un "worker pool", el retraso en la respuesta puede reducirse.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Directorio temporal del host
14
15// UploadTask contiene datos para el envío asíncrono de archivos.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler procesa las cargas de archivos. Guarda en un archivo temporal y luego encola para Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Crea una ruta de archivo temporal única en el host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Crea y copia el cuerpo de la solicitud a un archivo temporal (síncrono)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Encola la tarea para el envío asíncrono de Incus
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //ESTA PARTE
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Envía una respuesta inmediata 202 Accepted
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

Al observar la parte marcada como "ESTA PARTE", se habrá percatado de que se inserta una tarea en la cola.

Ahora, veamos cómo funciona la cola de tareas.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue inicializa la cola de tareas en memoria.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask añade una UploadTask a la cola.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask recupera una UploadTask de la cola, bloqueando si está vacía.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength devuelve el tamaño actual de la cola.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

La cola de tareas proporcionada como ejemplo está implementada de forma sencilla. Esta cola de tareas tiene una estructura simple que extrae las tareas de un canal.

A continuación, se presenta el método "worker" para enviar archivos a un contenedor después de la carga. Actualmente, el método utiliza un bucle infinito para una buena capacidad de respuesta y una fácil implementación, pero se puede añadir un algoritmo según el propósito.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Aplaza la limpieza del archivo temporal
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Procesa la tarea con reintentos para errores transitorios de Incus
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //tarea de carga separada
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Ejemplo de error permanente
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Primero, esta función itera continuamente intentando obtener una tarea de la cola. Posteriormente, dentro del rango de reintentos, intenta la etapa de carga de archivo temporal a contenedor, en lugar de flujo a archivo temporal.

Ventajas

La ventaja de este enfoque de procesamiento es que, si la carga del flujo es exitosa, se puede reducir el tiempo de retraso para las operaciones posteriores y prevenir el agotamiento de recursos debido a operaciones concurrentes en el contenedor. Como se observa en el código actual, las operaciones concurrentes en el contenedor están limitadas por el número de canales. De esta manera, hemos explorado un ejemplo práctico de cómo utilizar el procesamiento concurrente de Go. Si desea ver más ejemplos, visite los siguientes enlaces.Módulo que incluye ejemplos Proyecto que utiliza los ejemplos El proyecto en sí tiene muchos componentes adicionales, por lo que para aprender sobre el "worker" en sí, solo necesita revisar brevemente cómo se llama la función de inicialización del "worker" en main.go. El módulo también incluye otros tipos de "workers", así que consúltelo.