GoSuda

Mengurangi Waktu Respons Endpoint di Go – Memanfaatkan Antrean Tugas

By Yunjin Lee
views ...

Gambaran Umum

Ketika pertama kali belajar Go, seringkali kita belajar dengan mengimplementasikan server backend. Mari kita ambil contoh kasus di mana kita membuat contoh yang menerima stream file dari REST API dan mengunggahnya ke server. Server Go net/http secara default menangani beberapa permintaan secara bersamaan, sehingga tidak ada masalah dengan unggahan bersamaan itu sendiri. Namun, jika semua operasi setelah menerima stream diproses secara sinkron, respons endpoint akan tertunda. Mari kita pelajari teknik untuk mencegah situasi seperti ini.

Penyebab

Menerima stream umumnya membutuhkan waktu yang lama, dan untuk file besar, satu permintaan dapat diproses selama beberapa menit. Dalam kasus seperti ini, penting untuk memproses operasi setelah penerimaan secepat mungkin. Skenario contoh ini adalah menerima stream, menyimpannya sebagai file sementara, dan mendorongnya ke Container. Pada titik ini, jika bagian yang mendorong file sementara ke Container diproses dengan Worker Pool, penundaan respons dapat dipersingkat.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Create unique temporary file path on host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Create and copy request body to temporary file (synchronous)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Enqueue task for asynchronous Incus push
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName: containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //THIS PART
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Send immediate 202 Accepted response
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

Pada bagian yang ditandai dengan THIS PART, Anda mungkin menyadari bahwa tugas tersebut dimasukkan ke dalam antrean.

Sekarang mari kita lihat bagaimana antrean tugas bekerja.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

Antrean tugas yang disediakan sebagai contoh diimplementasikan secara sederhana. Antrean tugas ini memiliki struktur sederhana untuk mengeluarkan tugas-tugas yang ada di antrean dari Channel.

Berikut adalah metode Worker untuk mendorong file ke Container setelah pengunggahan file. Metode saat ini adalah loop tak terbatas untuk respons yang baik dan implementasi yang mudah, tetapi Anda dapat menambahkan algoritma tergantung pada tujuan.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Defer cleanup of the temporary file
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Process task with retries for transient Incus errors
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separate upload task
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Example permanent error
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Pertama, fungsi ini terus-menerus melakukan loop dan mencoba menerima tugas dari antrean. Setelah itu, dalam batas percobaan ulang, ia mencoba langkah pengunggahan dari file sementara ke Container, bukan dari stream ke file sementara.

Keuntungan

Keuntungan dari metode pemrosesan ini adalah, jika pengunggahan stream berhasil, waktu tunda untuk pemrosesan selanjutnya dapat dikurangi, dan kehabisan sumber daya akibat operasi Container yang bersamaan dapat dicegah. Seperti yang terlihat dalam kode saat ini, operasi Container yang dapat berjalan secara bersamaan dibatasi oleh jumlah Channel. Dengan demikian, kita telah mempelajari contoh praktis bagaimana Go dapat digunakan untuk pemrosesan paralel. Jika Anda ingin melihat lebih banyak contoh, silakan kunjungi tautan di bawah ini.Modul termasuk contoh Proyek yang menggunakan contoh Proyek itu sendiri memiliki banyak komponen tambahan, jadi untuk belajar tentang Worker itu sendiri, Anda hanya perlu melihat secara singkat bagaimana fungsi inisialisasi Worker dipanggil di main.go. Modul ini juga mencakup jenis Worker lain, jadi harap diperhatikan.