GoSuda

Skrátenie času odozvy koncového bodu v Go – využitie pracovných frontov

By Yunjin Lee
views ...

Prehľad

Pri prvom učení sa jazyka Go sa často stáva, že sa učíte implementáciou backend servera. Predstavme si napríklad, že vytvárate príklad, ktorý prijíma dátový prúd súboru z RestAPI a nahráva ho na server. Server Go net/http štandardne spracováva viacero požiadaviek súčasne, takže s paralelným nahrávaním nie sú žiadne problémy. Ak sa však všetky operácie po prijatí dátového prúdu spracovávajú synchrónne, odpoveď koncového bodu sa oneskorí. Pozrime sa na techniku, ako predísť takejto situácii.

Príčina

Prijímanie dátového prúdu si zvyčajne vyžaduje dlhý čas a v prípade veľkých súborov môže spracovanie jednej požiadavky trvať niekoľko minút. V takomto prípade je dôležité spracovať operácie po prijatí čo najrýchlejšie. Tento príklad scenára zahŕňa prijatie dátového prúdu, jeho uloženie do dočasného súboru a následné vloženie do kontajnera. V takomto prípade, ak sa časť vloženia dočasného súboru do kontajnera spracováva pomocou worker poolu, je možné skrátiť oneskorenie odpovede.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Create unique temporary file path on host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Create and copy request body to temporary file (synchronous)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Enqueue task for asynchronous Incus push
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //THIS PART
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Send immediate 202 Accepted response
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

Všimli ste si, že časť označená ako THIS PART vkladá úlohu do fronty.

Teraz sa pozrime, ako funguje fronta úloh.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

Príklad fronty úloh je implementovaný jednoducho. Táto fronta úloh má jednoduchú štruktúru, ktorá odoberá úlohy z kanála a vkladá ich do fronty.

Nižšie je metóda workera na vloženie súboru do kontajnera po jeho nahraní. Súčasná metóda je nekonečná slučka pre dobrú odozvu a jednoduchú implementáciu, ale podľa potreby je možné pridať algoritmy.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Defer cleanup of the temporary file
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Process task with retries for transient Incus errors
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separate upload task
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Example permanent error
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Najprv sa táto funkcia neustále pokúša získať úlohy z fronty v slučke. Následne sa pokúša o fázu nahrávania z dočasného súboru do kontajnera (nie zo dátového prúdu do dočasného súboru) v rámci rozsahu opakovaných pokusov.

Výhody

Výhodou tohto spôsobu spracovania je, že ak je nahrávanie dátového prúdu úspešné, je možné skrátiť oneskorenie spracovania následných úloh a zabrániť vyčerpaniu zdrojov v dôsledku súbežných operácií s kontajnermi. Ako je vidieť v súčasnom kóde, počet súbežných operácií s kontajnermi je obmedzený počtom kanálov. Takto sme preskúmali praktický príklad použitia súbežného spracovania v Go. Ak chcete vidieť viac príkladov, navštívte nasledujúce odkazy.Modul s príkladmi Projekt využívajúci príklady Samotný projekt obsahuje mnoho vedľajších komponentov, takže pre pochopenie workera stačí stručne pozrieť, ako funkcia init workera volá v main.go. Modul obsahuje aj iné typy workerov, takže si to prosím všimnite.