GoSuda

Endpoint-responstijden verkorten in Go – Gebruik maken van een Job Queue

By Yunjin Lee
views ...

Overzicht

Bij het leren van Go is het vaak zo dat men begint met het implementeren van een backend-server. Laten we als voorbeeld een situatie nemen waarin men een voorbeeld maakt om een bestandsstroom te ontvangen en te uploaden naar de server via een RestAPI. Een Go-taal net/http-server verwerkt doorgaans meerdere verzoeken tegelijk, dus gelijktijdige uploads vormen op zich geen probleem. Echter, als alle bewerkingen na ontvangst van de stroom synchroon worden verwerkt, zal de respons van het endpoint vertraagd worden. Laten we een techniek onderzoeken om een dergelijke situatie te voorkomen.

Oorzaak

Het ontvangen van een stroom neemt doorgaans ook veel tijd in beslag, en bij grote bestanden kan één enkel verzoek minutenlang worden verwerkt. In dergelijke gevallen is het belangrijk om de bewerkingen na ontvangst zo snel mogelijk af te handelen. Dit voorbeeldscenario betreft het ontvangen van een stroom, deze opslaan als tijdelijk bestand en vervolgens naar een container pushen. Als het pushen van het tijdelijke bestand naar de container in dit geval via een worker pool wordt afgehandeld, kan de responsvertraging worden verkort.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Create unique temporary file path on host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Create and copy request body to temporary file (synchronous)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Enqueue task for asynchronous Incus push
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //DIT DEEL
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Send immediate 202 Accepted response
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

U zult waarschijnlijk hebben opgemerkt dat het deel dat gemarkeerd is als THIS PART aangeeft dat de taak in een wachtrij wordt geplaatst.

Laten we nu eens kijken hoe de taakwachtrij werkt.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

De als voorbeeld gegeven taakwachtrij is eenvoudig geïmplementeerd. Deze taakwachtrij heeft een eenvoudige structuur die taken uit het kanaal haalt die in de wachtrij staan.

Hieronder staat de worker-methode voor het pushen van het bestand naar de container na de upload. De huidige methode is een oneindige lus voor een goede responsiviteit en eenvoudige implementatie, maar het staat vrij om algoritmen toe te voegen afhankelijk van het doel.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Defer cleanup of the temporary file
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Process task with retries for transient Incus errors
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separate upload task
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Example permanent error
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Allereerst probeert deze functie continu in een lus taken uit de wachtrij te halen. Daarna probeert het binnen het bereik van nieuwe pogingen de stap van tijdelijk bestand naar containerupload uit te voeren, in plaats van stroom naar tijdelijk bestand.

Voordelen

De voordelen van deze verwerkingsmethode zijn dat, mits de stream-upload normaal verloopt, de vertragingstijd voor daaropvolgende verwerkingstaken kan worden verminderd en uitputting van bronnen door gelijktijdige containerbewerkingen kan worden voorkomen. Zoals in de huidige code te zien is, wordt het aantal gelijktijdig uitvoerbare containerbewerkingen beperkt door het aantal kanalen. Op deze manier hebben we een praktisch voorbeeld bekeken van hoe Go's parallelle verwerking kan worden gebruikt. Als u meer voorbeelden wilt zien, bezoek dan de onderstaande links.Module inclusief voorbeelden Project dat voorbeelden gebruikt Het project zelf bevat veel bijkomende componenten, dus voor het leren van de worker zelf hoeft u alleen kort te kijken hoe de worker init-functie in main.go wordt aangeroepen. De module bevat ook andere vormen van workers, dus raadpleeg deze.