Reducción del tiempo de respuesta del endpoint en Go: uso de colas de tareas
개요
Cuando se aprende Go por primera vez, a menudo se aprende implementando un servidor backend. Tomemos como ejemplo el caso de crear un ejemplo para recibir el flujo de un archivo (file stream) desde una RestAPI, entre otros, y subirlo al servidor. El servidor net/http del lenguaje Go maneja fundamentalmente múltiples solicitudes simultáneamente, por lo que la subida concurrente en sí misma no es problemática. Sin embargo, si todas las operaciones posteriores a la recepción del flujo se procesan de forma síncrona, la respuesta del endpoint se retrasa. Examinemos una técnica para prevenir tales situaciones.
원인
La recepción del flujo también suele llevar mucho tiempo, y para archivos grandes, una única solicitud puede procesarse durante varios minutos. En tales casos, es crucial procesar las operaciones posteriores a la recepción con la mayor rapidez posible. Este escenario de ejemplo es la recepción del flujo, el guardado como archivo temporal y la inserción (push) en un contenedor. Si la parte de inserción del archivo temporal en el contenedor se maneja con un pool de workers (worker pool), se puede acortar el retraso de la respuesta.
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Directorio temporal del Host
14
15// UploadTask contiene datos para la inserción asíncrona de archivos.
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandler procesa las subidas de archivos. Guarda en un archivo temporal, luego pone en cola para la inserción en Incus.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40 return
41 }
42
43 // Crea una ruta de archivo temporal única en el host
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49 http.Error(wr, "Server error.", http.StatusInternalServerError)
50 return
51 }
52
53 // Crea y copia el cuerpo de la solicitud al archivo temporal (síncrono)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("ERROR: Failed to create temporary file: %v", err)
57 http.Error(wr, "Server error.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67 http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72 // Pone en cola la tarea para la inserción asíncrona en Incus
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //THIS PART
80 log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82 // Envía una respuesta inmediata 202 Accepted
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}
Al observar la parte marcada como THIS PART, habrá notado que la tarea se inserta en una cola.
Ahora veamos cómo funciona la cola de tareas.
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueue inicializa la cola de tareas en memoria.
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Upload Info: Work queue initialized.")
16 })
17}
18
19// EnqueueTask añade un UploadTask a la cola.
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("ERROR: Task queue not initialized.")
23 }
24 taskQueue <- task
25 log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask recupera un UploadTask de la cola, bloqueando si está vacía.
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("ERROR: Task queue not initialized.")
32 }
33 task := <-taskQueue
34 log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLength devuelve el tamaño actual de la cola.
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
La cola de tareas proporcionada como ejemplo está implementada de manera simple. Esta cola de tareas tiene una estructura simple que extrae las tareas que están en la cola desde un channel.
A continuación, se muestra el método del worker para la inserción en el contenedor después de la subida del archivo. Actualmente, el método utiliza un bucle infinito para una buena capacidad de respuesta y una implementación sencilla, pero se pueden agregar algoritmos según el propósito.
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
5
6 // Aplaza la limpieza del archivo temporal
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10 } else {
11 log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // Procesa la tarea con reintentos para errores transitorios de Incus
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //separate upload task
18 if err == nil {
19 log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // Example permanent error
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
En primer lugar, esta función intenta continuamente obtener tareas de la cola mientras ejecuta un bucle. Luego, intenta la etapa de subida de archivo temporal a contenedor (no de flujo a archivo temporal) dentro del rango de reintentos.
이점
La ventaja de este método de procesamiento es que, si la subida del flujo es exitosa, se puede reducir el tiempo de latencia para las tareas que se procesan posteriormente y se puede prevenir el agotamiento de recursos causado por las operaciones concurrentes del contenedor. Como se ve en el código actual, las operaciones concurrentes del contenedor que se pueden llevar a cabo están limitadas por el número de channels. De esta manera, hemos examinado un ejemplo práctico de cómo utilizar el procesamiento paralelo de Go. Si desea ver más ejemplos, visite los siguientes enlaces.Módulo que incluye ejemplos Proyecto que utiliza los ejemplos Dado que el proyecto en sí tiene muchos componentes accesorios, para aprender sobre el worker, solo necesita echar un vistazo breve a cómo se llama la función init del worker en main.go y seguir adelante. El módulo también incluye otros tipos de workers, así que consúltelo.