GoSuda

Verkorting van de responstijd van de endpoint in Go - Benutting van een job queue

By Yunjin Lee
views ...

Overzicht

Wanneer men Go voor het eerst leert, wordt dit vaak gedaan door het implementeren van een backend server. Laten wij het geval nemen waarin een voorbeeld wordt gemaakt waarbij een bestandstroom (file stream) wordt ontvangen van een RestAPI, et cetera, en naar de server wordt geüpload. De Go-taal net/http server verwerkt standaard gelijktijdig meerdere verzoeken, dus er is geen probleem met de gelijktijdige (simultane) upload zelf. Echter, indien alle operaties na de stroomontvangst synchroon worden verwerkt, treedt vertraging op in de respons van de endpoint. Laten wij de technieken bestuderen om dergelijke situaties te voorkomen.

Oorzaak

Het ontvangen van de stroom kost doorgaans ook veel tijd, en bij grote bestanden kan een enkel verzoek minutenlang in behandeling zijn. In dergelijke gevallen is het cruciaal om de operaties na de ontvangst, zelfs al is het marginaal, spoedig af te handelen. Dit voorbeeldscenario betreft het opslaan van de stroom in een tijdelijk bestand na ontvangst en het vervolgens pushen naar een container. Indien het onderdeel dat het tijdelijke bestand naar de container pusht, wordt afgehandeld door een worker pool, kan de responsvertraging worden gereduceerd.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Tijdelijke directory op de host
14
15// UploadTask bevat gegevens voor asynchroon bestandspushen.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler verwerkt bestandsuploads. Slaat op in tijdelijk bestand, plaatst vervolgens in wachtrij voor Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Create unique temporary file path on host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Create and copy request body to temporary file (synchronous)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Enqueue task for asynchronous Incus push
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //DIT GEDEELTE
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Send immediate 202 Accepted response
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

Als men kijkt naar het gedeelte aangeduid als THIS PART, zal men opmerken dat de taak in een wachtrij wordt ingevoegd.

Laten wij nu aanschouwen hoe de taakwachtrij functioneert.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initialiseert de in-memory taakwachtrij.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask voegt een UploadTask toe aan de wachtrij.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask haalt een UploadTask uit de wachtrij, blokkerend indien leeg.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength retourneert de huidige grootte van de wachtrij.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

De als voorbeeld gegeven taakwachtrij is op eenvoudige wijze geïmplementeerd. Deze taakwachtrij bezit een simpele structuur die de taken die zich in de wachtrij bevinden uit het channel haalt.

Hieronder volgt de worker methode ter facilitering van het pushen naar de container na de bestandsupload. De huidige methode is een oneindige loop ten behoeve van goede responsiviteit en eenvoudige implementatie, maar het is geoorloofd om algoritmen toe te voegen, afhankelijk van het specifieke gebruik.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Opschoning van het tijdelijke bestand uitstellen
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Taak verwerken met hertries voor vluchtige Incus-fouten
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separate upload task
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Example permanent error
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Vooreerst poogt deze functie continu in een lus te draaien om taken uit de wachtrij te verkrijgen. Vervolgens wordt de uploadfase van tijdelijk bestand naar container geprobeerd, binnen het bereik van hertries, en niet de stap van stroom naar tijdelijk bestand.

Voordelen

Het voordeel van deze verwerkingswijze is dat, mits de stroomupload correct verloopt, de latentie voor de achtereenvolgende operaties kan worden verminderd, en bovendien kan de uitputting van resources als gevolg van gelijktijdige containeroperaties worden voorkomen. Zoals blijkt uit de huidige code, wordt het aantal gelijktijdig uitvoerbare containertaken beperkt door de capaciteit van het channel. Wij hebben zodoende een praktisch voorbeeld onderzocht waarin de parallelle verwerkingsmogelijkheden van Go benut kunnen worden. Indien men meer voorbeelden wenst te aanschouwen, gelieve de onderstaande koppelingen te raadplegen.Module met voorbeelden Project dat de voorbeelden benut Aangezien het project zelf vele nevencomponenten bevat, volstaat het voor het bestuderen van de worker zelf om kortstondig te bekijken hoe de init-functie van de worker in main.go wordt aangeroepen. Gelieve er rekening mee te houden dat de module ook andere varianten van workers bevat.