GoSuda

Redusere endepunktets responstid i Go – Bruk av oppgavekøer

By Yunjin Lee
views ...

Oversikt

Når man først lærer Go, er det ofte slik at man lærer det ved å implementere en Backend-server. La oss ta et eksempel der man lager et eksempel som mottar en filstrøm via RestAPI og laster den opp til serveren. Go-språkets net/http-server behandler som standard flere forespørsler samtidig, så det er ingen problemer med selve den samtidige opplastingen. Men hvis alle operasjoner etter mottak av strømmen behandles synkront, vil endepunktets respons bli forsinket. La oss undersøke en teknikk for å forhindre en slik situasjon.

Årsak

Mottak av strømmer tar generelt lang tid, og for store filer kan en enkelt forespørsel behandles i flere minutter. I slike tilfeller er det viktig å behandle operasjoner etter mottak så raskt som mulig. Dette eksempelet beskriver et scenario der en strøm mottas, lagres som en midlertidig fil, og deretter pushes til en Container. Hvis denne delen, der den midlertidige filen pushes til Containeren, behandles med en Worker Pool, kan responstiden reduseres.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Midlertidig katalog på verten
14
15// UploadTask inneholder data for asynkron filpush.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler behandler filopplastinger. Lagrer til midlertidig fil, deretter køer for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST-metode kreves.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Mangler X-File-Path eller X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "Filbanen må være absolutt.", http.StatusBadRequest)
40		return
41	}
42
43	// Opprett unik midlertidig filbane på verten
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("FEIL: Kunne ikke opprette midlertidig opplastingskatalog: %v", err)
49		http.Error(wr, "Serverfeil.", http.StatusInternalServerError)
50		return
51	}
52
53	// Opprett og kopier forespørselskroppen til midlertidig fil (synkront)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("FEIL: Kunne ikke opprette midlertidig fil: %v", err)
57		http.Error(wr, "Serverfeil.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("FEIL: Kunne ikke kopiere forespørselskroppen til midlertidig fil: %v", err)
67		http.Error(wr, "Filoverføring mislyktes.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Opplastingsinfo: Mottok %d bytes, lagret til %s.", bytesWritten, hostTempFilePath)
71
72	// Legg til oppgaven i køen for asynkron Incus push
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //DENNE DELEN
80	log.Printf("Opplastingsinfo: Oppgave satt i kø for %s til %s.", originalFilePath, containerName)
81
82	// Send umiddelbar 202 Accepted respons
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "Filen '%s' er satt i kø for behandling på container '%s'.\n", originalFilename, containerName)
85}

Du har sannsynligvis lagt merke til at delen merket "THIS PART" indikerer at en oppgave settes i kø.

La oss nå se hvordan oppgavekøen fungerer.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initialiserer den interne oppgavekøen.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Opplastingsinformasjon: Arbeidskø initialisert.")
16	})
17}
18
19// EnqueueTask legger til en UploadTask i køen.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("FEIL: Oppgavekøen er ikke initialisert.")
23	}
24	taskQueue <- task
25	log.Printf("Opplastingsinformasjon: Kø: Oppgave lagt til i kø. Størrelse: %d", len(taskQueue))
26}
27
28// DequeueTask henter en UploadTask fra køen, og blokkerer hvis den er tom.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("FEIL: Oppgavekøen er ikke initialisert.")
32	}
33	task := <-taskQueue
34	log.Printf("Opplastingsinformasjon: Kø: Oppgave fjernet fra kø. Størrelse: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength returnerer gjeldende køstørrelse.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

Oppgavekøen som er gitt som eksempel er enkelt implementert. Denne oppgavekøen har en enkel struktur der den henter oppgaver fra Channelen som er i køen.

Nedenfor er Worker-metoden for å pushe filen til Containeren etter filopplasting. Den nåværende metoden er en uendelig løkke for god respons og enkel implementering, men det er greit å legge til algoritmer avhengig av formålet.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Opplastingsinformasjon: Worker behandler oppgave for %s fra %s.", task.ContainerName, task.HostFilename)
 5
 6		// Utsett opprydding av den midlertidige filen
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("FEIL: Worker: Kunne ikke fjerne midlertidig fil '%s': %v", filePath, err)
10			} else {
11				log.Printf("Opplastingsinformasjon: Worker: Ryddet opp i midlertidig fil: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Behandle oppgaven med forsøk på nytt for midlertidige Incus-feil
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separat opplastings-oppgave
18			if err == nil {
19				log.Printf("SUKSESS: Worker: Oppgave fullført for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Eksempel på permanent feil
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("ADVARSEL: Worker: Oppgave mislyktes for %s (forsøk %d/%d): %v. Prøver på nytt.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("FEIL: Worker: Oppgave mislyktes permanent for %s etter %d forsøk: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Først vil denne funksjonen kontinuerlig kjøre i en løkke og forsøke å motta Tasks fra køen. Deretter, innenfor gjenforsøksområdet, vil den forsøke opplastingstrinnet fra midlertidig fil til Container, i stedet for fra strøm til midlertidig fil.

Fordeler

Fordelen med denne behandlingsmetoden er at hvis strømopplastingen er normal, kan forsinkelsen i den påfølgende behandlingen reduseres, og ressursmangel forårsaket av samtidige Container-operasjoner kan forebygges. Som det fremgår av den nåværende koden, er samtidige Container-operasjoner begrenset av antall Channels. Vi har sett et praktisk eksempel på hvordan Go's Parallel Processing kan brukes. Hvis du ønsker å se flere eksempler, vennligst besøk lenkene nedenfor.Modul inkludert eksempler Prosjekt som bruker eksemplene Selve prosjektet inneholder mange tilleggskomponenter, så for å lære om Workeren selv, trenger du bare å se kort på hvordan Worker Init-funksjonen kalles i main.go. Modulen inkluderer også andre typer Workers, så vær oppmerksom på det.