Reducción del tiempo de respuesta del endpoint en Go: utilización de colas de tareas
Resumen
Cuando se aprende Go por primera vez, es frecuente que el aprendizaje se realice implementando un servidor de backend. Tomemos como ejemplo la creación de un caso de uso en el que se recibe un flujo de archivo de una RestAPI y se carga al servidor. El servidor net/http del lenguaje Go procesa múltiples solicitudes simultáneamente por defecto, por lo que la carga concurrente no presenta problemas en sí misma. Sin embargo, si todas las operaciones posteriores a la recepción del flujo se procesan de forma síncrona, la respuesta del endpoint se retrasa. A continuación, exploraremos una técnica para prevenir esta situación.
Causa
La recepción de un flujo generalmente requiere un tiempo considerable, y en el caso de archivos grandes, una sola solicitud puede tardar varios minutos en procesarse. En tales circunstancias, es crucial procesar las operaciones posteriores a la recepción de la manera más rápida posible. Este escenario de ejemplo implica la recepción de un flujo, su almacenamiento como archivo temporal y su posterior envío a un contenedor. Si la parte del envío del archivo temporal al contenedor se gestiona con un "worker pool", el retraso en la respuesta puede reducirse.
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Directorio temporal del host
14
15// UploadTask contiene datos para el envío asíncrono de archivos.
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandler procesa las cargas de archivos. Guarda en un archivo temporal y luego encola para Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40 return
41 }
42
43 // Crea una ruta de archivo temporal única en el host
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49 http.Error(wr, "Server error.", http.StatusInternalServerError)
50 return
51 }
52
53 // Crea y copia el cuerpo de la solicitud a un archivo temporal (síncrono)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("ERROR: Failed to create temporary file: %v", err)
57 http.Error(wr, "Server error.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67 http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72 // Encola la tarea para el envío asíncrono de Incus
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //ESTA PARTE
80 log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82 // Envía una respuesta inmediata 202 Accepted
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}
Al observar la parte marcada como "ESTA PARTE", se habrá percatado de que se inserta una tarea en la cola.
Ahora, veamos cómo funciona la cola de tareas.
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueue inicializa la cola de tareas en memoria.
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Upload Info: Work queue initialized.")
16 })
17}
18
19// EnqueueTask añade una UploadTask a la cola.
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("ERROR: Task queue not initialized.")
23 }
24 taskQueue <- task
25 log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask recupera una UploadTask de la cola, bloqueando si está vacía.
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("ERROR: Task queue not initialized.")
32 }
33 task := <-taskQueue
34 log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLength devuelve el tamaño actual de la cola.
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
La cola de tareas proporcionada como ejemplo está implementada de forma sencilla. Esta cola de tareas tiene una estructura simple que extrae las tareas de un canal.
A continuación, se presenta el método "worker" para enviar archivos a un contenedor después de la carga. Actualmente, el método utiliza un bucle infinito para una buena capacidad de respuesta y una fácil implementación, pero se puede añadir un algoritmo según el propósito.
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
5
6 // Aplaza la limpieza del archivo temporal
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10 } else {
11 log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // Procesa la tarea con reintentos para errores transitorios de Incus
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //tarea de carga separada
18 if err == nil {
19 log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // Ejemplo de error permanente
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
Primero, esta función itera continuamente intentando obtener una tarea de la cola. Posteriormente, dentro del rango de reintentos, intenta la etapa de carga de archivo temporal a contenedor, en lugar de flujo a archivo temporal.
Ventajas
La ventaja de este enfoque de procesamiento es que, si la carga del flujo es exitosa, se puede reducir el tiempo de retraso para las operaciones posteriores y prevenir el agotamiento de recursos debido a operaciones concurrentes en el contenedor. Como se observa en el código actual, las operaciones concurrentes en el contenedor están limitadas por el número de canales. De esta manera, hemos explorado un ejemplo práctico de cómo utilizar el procesamiento concurrente de Go. Si desea ver más ejemplos, visite los siguientes enlaces.Módulo que incluye ejemplos Proyecto que utiliza los ejemplos El proyecto en sí tiene muchos componentes adicionales, por lo que para aprender sobre el "worker" en sí, solo necesita revisar brevemente cómo se llama la función de inicialización del "worker" en main.go. El módulo también incluye otros tipos de "workers", así que consúltelo.