GoSuda

Reduser endepunktresponstiden i Go – Bruk Work Queues

By Yunjin Lee
views ...

Oversikt

Når man først lærer Go, er det ofte slik at man lærer ved å implementere en backend-server. La oss ta et eksempel der man lager et eksempel som mottar en filstrøm fra RestAPI eller lignende og laster den opp til serveren. Go-språkets net/http-server behandler som standard flere forespørsler samtidig, så det er ingen problemer med samtidig opplasting i seg selv. Men hvis alle operasjoner etter mottak av strømmen behandles synkront, vil endepunktets respons bli forsinket. La oss utforske en teknikk for å forhindre en slik situasjon.

Årsak

Mottak av en strøm tar vanligvis lang tid, og for store filer kan en enkelt forespørsel behandles i flere minutter. I slike tilfeller er det viktig å behandle operasjonene etter mottak så raskt som mulig. Dette eksempelsenarioet er et scenario der strømmen mottas, lagres som en midlertidig fil, og deretter pushes til en container. I dette tilfellet, hvis delen som pusher den midlertidige filen til containeren behandles med en worker pool, kan responsforsinkelsen reduseres.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host midlertidig katalog
14
15// UploadTask inneholder data for asynkron filpush.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler behandler filopplastinger. Lagrer til midlertidig fil, deretter køes for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST-metoden er påkrevd.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Mangler X-File-Path eller X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "Filstien må være absolutt.", http.StatusBadRequest)
40		return
41	}
42
43	// Opprett unik midlertidig filsti på host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Klarte ikke å opprette midlertidig opplastingskatalog: %v", err)
49		http.Error(wr, "Serverfeil.", http.StatusInternalServerError)
50		return
51	}
52
53	// Opprett og kopier forespørselskroppen til midlertidig fil (synkront)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Klarte ikke å opprette midlertidig fil: %v", err)
57		http.Error(wr, "Serverfeil.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Klarte ikke å kopiere forespørselskroppen til midlertidig fil: %v", err)
67		http.Error(wr, "Filoverføring mislyktes.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Opplastingsinfo: Mottok %d bytes, lagret til %s.", bytesWritten, hostTempFilePath)
71
72	// Legg til oppgave i kø for asynkron Incus push
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //DENNE DELEN
80	log.Printf("Opplastingsinfo: Oppgave satt i kø for %s til %s.", originalFilePath, containerName)
81
82	// Send umiddelbar 202 Accepted respons
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "Filen '%s' er satt i kø for behandling på container '%s'.\n", originalFilename, containerName)
85}

Her, hvis du ser på delen som er merket THIS PART, har du sannsynligvis lagt merke til at oppgaven settes i kø.

La oss nå se hvordan oppgavekøen fungerer.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initialiserer den interne oppgavekøen.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Arbeidskø initialisert.")
16	})
17}
18
19// EnqueueTask legger til en UploadTask i køen.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Oppgavekø ikke initialisert.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Kø: Oppgave satt i kø. Størrelse: %d", len(taskQueue))
26}
27
28// DequeueTask henter en UploadTask fra køen, blokkerer hvis tom.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Oppgavekø ikke initialisert.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Kø: Oppgave tatt ut av kø. Størrelse: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength returnerer gjeldende køstørrelse.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

Eksempelet på oppgavekøen som er gitt, er enkelt implementert. Denne oppgavekøen har en enkel struktur som trekker ut oppgaver som er i køen fra kanalen.

Nedenfor er worker-metoden for å pushe til containeren etter filopplasting. Den nåværende metoden er en uendelig løkke for god respons og enkel implementering, men det er greit å legge til algoritmer avhengig av formålet.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker behandler oppgave for %s fra %s.", task.ContainerName, task.HostFilename)
 5
 6		// Utsett opprydding av den midlertidige filen
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Klarte ikke å fjerne midlertidig fil '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Ryddet opp i midlertidig fil: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Behandle oppgave med gjenforsøk for midlertidige Incus-feil
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separat opplastingsoppgave
18			if err == nil {
19				log.Printf("SUKSESS: Worker: Oppgave fullført for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Eksempel på permanent feil
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("ADVARSEL: Worker: Oppgave mislyktes for %s (forsøk %d/%d): %v. Prøver igjen.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Oppgave permanent mislyktes for %s etter %d forsøk: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Først vil denne funksjonen kontinuerlig kjøre i en løkke og forsøke å hente oppgaver fra køen. Deretter, innenfor gjenforsøksområdet, vil den forsøke opplastingstrinnet fra midlertidig fil til container, i stedet for strøm-til-midlertidig fil.

Fordeler

Fordelen med denne behandlingsmetoden er at hvis strømopplastingen er vellykket, kan forsinkelsen for påfølgende behandlede oppgaver reduseres, og ressursmangel forårsaket av samtidige containeroperasjoner kan forhindres. Som det fremgår av den nåværende koden, er samtidige containeroperasjoner begrenset av antall kanaler. Vi har sett et praktisk eksempel på hvordan man kan bruke Go's parallelle behandling på denne måten. Hvis du ønsker å se flere eksempler, vennligst besøk lenkene nedenfor.Modul som inkluderer eksempler Prosjekt som bruker eksempler Selve prosjektet inneholder mange tilleggskomponenter, så for å lære om worker-en i seg selv, er det nok å bare kort se hvordan worker init-funksjonen kalles i main.go. Modulen inkluderer også andre typer worker-e, så vennligst referer til den.