GoSuda

Go'da Uç Nokta Yanıt Sürelerini Kısaltma - İş Kuyruklarını Kullanma

By Yunjin Lee
views ...

개요

Go'yu ilk öğrenirken, genellikle bir backend sunucusu uygulayarak öğrenme durumuyla karşılaşılır. Bu durumda, RestAPI gibi kaynaklardan dosya akışlarını alıp sunucuya yükleyen bir örnek oluşturduğumuzu varsayalım. Go dilindeki net/http sunucusu, varsayılan olarak birden çok isteği eşzamanlı olarak işlediğinden, eşzamanlı çoklu yüklemelerde bir sorun bulunmamaktadır. Ancak, akış alımı sonrası tüm işlemler senkronize olarak gerçekleştirilirse, endpoint'in yanıtı gecikecektir. Bu tür bir durumu önlemek için kullanılabilecek teknikleri inceleyelim.

원인

Akışları almak genellikle uzun zaman alır ve büyük dosyalar söz konusu olduğunda tek bir istek birkaç dakika sürebilir. Bu gibi durumlarda, alım sonrası işlemleri mümkün olduğunca hızlı bir şekilde gerçekleştirmek önemlidir. Bu örnek senaryo, bir akışı aldıktan sonra geçici bir dosyaya kaydetme ve ardından bir konteynere itme senaryosudur. Bu noktada, geçici dosyayı konteynere itme kısmını bir worker pool ile işlersek, yanıt gecikmesini kısaltabiliriz.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Host üzerinde benzersiz geçici dosya yolu oluştur
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Geçici yükleme dizini oluşturulamadı: %v", err)
49		http.Error(wr, "Sunucu hatası.", http.StatusInternalServerError)
50		return
51	}
52
53	// İstek gövdesini oluştur ve geçici dosyaya kopyala (senkron)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Geçici dosya oluşturulamadı: %v", err)
57		http.Error(wr, "Sunucu hatası.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: İstek gövdesi geçici dosyaya kopyalanamadı: %v", err)
67		http.Error(wr, "Dosya transferi başarısız oldu.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Yükleme Bilgisi: %d bayt alındı, %s adresine kaydedildi.", bytesWritten, hostTempFilePath)
71
72	// Asenkron Incus itme için görevi kuyruğa al
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //BU KISIM
80	log.Printf("Yükleme Bilgisi: %s için %s adresine görev kuyruğa alındı.", originalFilePath, containerName)
81
82	// Anında 202 Accepted yanıtı gönder
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "'%s' dosyası '%s' konteynerinde işlenmek üzere kuyruğa alındı.\n", originalFilename, containerName)
85}

Burada "THIS PART" olarak işaretlenmiş kısma baktığınızda, bir görevin kuyruğa eklendiğini fark etmiş olmalısınız.

Şimdi, görev kuyruğunun nasıl çalıştığına bakalım.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue in-memory görev kuyruğunu başlatır.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Yükleme Bilgisi: İş kuyruğu başlatıldı.")
16	})
17}
18
19// EnqueueTask bir UploadTask'ı kuyruğa ekler.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Görev kuyruğu başlatılmadı.")
23	}
24	taskQueue <- task
25	log.Printf("Yükleme Bilgisi: Kuyruk: Görev kuyruğa alındı. Boyut: %d", len(taskQueue))
26}
27
28// DequeueTask kuyruktan bir UploadTask alır, boşsa engellenir.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Görev kuyruğu başlatılmadı.")
32	}
33	task := <-taskQueue
34	log.Printf("Yükleme Bilgisi: Kuyruk: Görev kuyruktan çıkarıldı. Boyut: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength mevcut kuyruk boyutunu döndürür.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

Örnek olarak sunulan görev kuyruğu basit bir şekilde uygulanmıştır. Bu görev kuyruğu, kuyruktaki görevleri bir kanaldan çıkaran basit bir yapıya sahiptir.

Aşağıda, dosya yüklemesinin ardından konteynere itmek için kullanılan worker metodu bulunmaktadır. Mevcut metod, iyi bir yanıt hızı ve kolay uygulama için sonsuz bir döngüdür, ancak amaca göre algoritmalar eklenebilir.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Yükleme Bilgisi: İşçi, %s konteyneri için %s dosyasından görevi işliyor.", task.ContainerName, task.HostFilename)
 5
 6		// Geçici dosyanın temizliğini ertele
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: İşçi: Geçici '%s' dosyası kaldırılamadı: %v", filePath, err)
10			} else {
11				log.Printf("Yükleme Bilgisi: İşçi: Geçici dosya temizlendi: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Geçici Incus hataları için yeniden denemelerle görevi işle
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //ayrı yükleme görevi
18			if err == nil {
19				log.Printf("BAŞARI: İşçi: %s için görev tamamlandı.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Örnek kalıcı hata
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("UYARI: İşçi: %s için görev başarısız oldu (deneme %d/%d): %v. Yeniden deneniyor.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: İşçi: %s için görev %d denemeden sonra kalıcı olarak başarısız oldu: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Öncelikle, bu fonksiyon sürekli bir döngüde çalışarak kuyruktan görev almaya çalışır. Daha sonra, yeniden deneme aralığı içinde akış-geçici dosya yerine, geçici dosya-konteyner yükleme aşamasını dener.

이점

Bu işlem yaklaşımının avantajı, akış yüklemesi normal olduğu sürece sonraki işlemlerin gecikme süresini azaltabilmesi ve eşzamanlı konteyner işlemleri nedeniyle kaynak tükenmesini önleyebilmesidir. Mevcut kodda görüldüğü gibi, eşzamanlı olarak gerçekleştirilebilecek konteyner işlemleri kanal sayısı ile sınırlıdır. Bu şekilde, Go'nun paralel işleme yeteneğini pratik olarak kullanabileceğimiz bir örneği inceledik. Daha fazla örnek görmek isterseniz, aşağıdaki bağlantıları ziyaret ediniz.Örnekleri İçeren Modül Örnekleri Kullanan Proje Projenin kendisi birçok ek bileşen içerdiğinden, worker'ın kendisi hakkında bilgi edinmek için main.go dosyasında worker init fonksiyonunun nasıl çağrıldığına kısaca göz atmanız yeterlidir. Modül, başka türde worker'lar da içerdiğinden lütfen bunu da dikkate alınız.