Go'da Uç Nokta Yanıt Sürelerini Kısaltma - İş Kuyruklarını Kullanma
Özet
Go öğrenmeye ilk başlandığında, sıklıkla bir Backend sunucusu uygulayarak öğrenme yoluna gidilir. Bu durumda, RestAPI gibi kaynaklardan bir dosya Stream'i alarak sunucuya yükleme örneği oluşturma senaryosunu ele alalım. Go dilindeki net/http sunucusu, temel olarak birden fazla isteği eşzamanlı olarak işlediğinden, eşzamanlı yüklemelerde herhangi bir sorun teşkil etmez. Ancak, Stream alındıktan sonraki tüm işlemlerin senkronize olarak yürütülmesi durumunda, Endpoint'in yanıtı gecikecektir. Bu tür durumları önlemek için kullanılan bir tekniği inceleyelim.
Neden
Bir Stream'i almak genellikle uzun zaman alır ve büyük dosyalar için tek bir istek birkaç dakika boyunca işlenebilir. Bu durumda, Stream alındıktan sonraki işlemleri mümkün olduğunca hızlı bir şekilde gerçekleştirmek önemlidir. Bu örnek senaryo, bir Stream'i aldıktan sonra geçici bir dosyaya kaydetme ve ardından bir Container'a Push etme senaryosudur. Bu aşamada, geçici dosyayı Container'a Push etme kısmının bir Worker Pool ile işlenmesi durumunda, yanıt gecikmesi kısaltılabilir.
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host geçici dizini
14
15// UploadTask, asenkron dosya Push'u için verileri tutar.
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandler, dosya yüklemelerini işler. Geçici dosyaya kaydeder, ardından Incus Push'u için sıraya koyar.
24func UploadHandler(wr http.ResponseWriter, req *http.Method) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "POST metodu gereklidir.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "X-File-Path veya X-Container-Name başlığı eksik.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "Dosya yolu mutlak olmalı.", http.StatusBadRequest)
40 return
41 }
42
43 // Host üzerinde benzersiz geçici dosya yolu oluştur
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("HATA: Geçici yükleme dizini oluşturulamadı: %v", err)
49 http.Error(wr, "Sunucu hatası.", http.StatusInternalServerError)
50 return
51 }
52
53 // İstek gövdesini geçici dosyaya oluştur ve kopyala (senkron)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("HATA: Geçici dosya oluşturulamadı: %v", err)
57 http.Error(wr, "Sunucu hatası.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("HATA: İstek gövdesi geçici dosyaya kopyalanamadı: %v", err)
67 http.Error(wr, "Dosya aktarımı başarısız oldu.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Yükleme Bilgisi: %d bayt alındı, %s konumuna kaydedildi.", bytesWritten, hostTempFilePath)
71
72 // Asenkron Incus Push'u için görevi sıraya ekle
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //BU KISIM
80 log.Printf("Yükleme Bilgisi: %s için görev sıraya eklendi %s.", originalFilePath, containerName)
81
82 // Anında 202 Accepted yanıtı gönder
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "'%s' dosyası '%s' Container'ında işlenmek üzere sıraya alındı.\n", originalFilename, containerName)
85}
Burada THIS PART olarak işaretlenmiş kısımda, bir görevin kuyruğa eklendiğini fark etmiş olmalısınız.
Şimdi görev kuyruğunun nasıl çalıştığına bakalım.
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueue, bellek içi görev kuyruğunu başlatır.
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Yükleme Bilgisi: İş kuyruğu başlatıldı.")
16 })
17}
18
19// EnqueueTask, bir UploadTask'ı kuyruğa ekler.
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("HATA: Görev kuyruğu başlatılmadı.")
23 }
24 taskQueue <- task
25 log.Printf("Yükleme Bilgisi: Kuyruk: Görev sıraya eklendi. Boyut: %d", len(taskQueue))
26}
27
28// DequeueTask, kuyruktan bir UploadTask alır, boşsa engellenir.
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("HATA: Görev kuyruğu başlatılmadı.")
32 }
33 task := <-taskQueue
34 log.Printf("Yükleme Bilgisi: Kuyruk: Görev kuyruktan çıkarıldı. Boyut: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLength, mevcut kuyruk boyutunu döndürür.
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
Örnek olarak sağlanan görev kuyruğu basit bir şekilde uygulanmıştır. Bu görev kuyruğu, kuyruğa eklenen görevleri bir Channel'dan çıkaran basit bir yapıya sahiptir.
Aşağıda, dosya yüklemesinden sonra Container'a Push yapmak için kullanılan Worker metodu bulunmaktadır. Mevcut metod, iyi bir tepkisellik ve kolay bir uygulama için sonsuz bir döngü olsa da, amaca göre algoritmalar eklenebilir.
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Yükleme Bilgisi: Worker, %s için %s'den gelen görevi işliyor.", task.ContainerName, task.HostFilename)
5
6 // Geçici dosyanın temizliğini ertele
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("HATA: Worker: Geçici dosya '%s' kaldırılamadı: %v", filePath, err)
10 } else {
11 log.Printf("Yükleme Bilgisi: Worker: Geçici dosya temizlendi: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // Geçici Incus hataları için yeniden denemelerle görevi işle
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //Yükleme görevini ayır
18 if err == nil {
19 log.Printf("BAŞARI: Worker: %s için görev tamamlandı.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // Kalıcı hata örneği
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("UYARI: Worker: %s için görev başarısız oldu (deneme %d/%d): %v. Yeniden deneniyor.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("HATA: Worker: %s için görev %d deneme sonra kalıcı olarak başarısız oldu: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
Bu fonksiyon öncelikle sürekli bir döngü içinde görevleri kuyruktan almaya çalışır. Daha sonra, yeniden deneme aralığı içinde, Stream-geçici dosya değil, geçici dosya-Container'a yükleme aşamasını dener.
Avantajlar
Bu işlem yönteminin avantajı, Stream yüklemesi normal olduğu sürece sonraki işlemlerin gecikme süresini azaltması ve eşzamanlı Container işlemleri nedeniyle kaynak tükenmesini önlemesidir. Mevcut kodda görüldüğü gibi, aynı anda yürütülebilecek Container işlemleri Channel sayısı ile sınırlıdır. Bu şekilde, Go'nun eşzamanlı işlem yeteneklerini pratik olarak kullanabileceğimiz bir örnek inceledik. Daha fazla örnek görmek isterseniz, lütfen aşağıdaki bağlantıları ziyaret edin.Örnekleri içeren modül Örnekleri kullanan proje Projenin kendisi birçok yardımcı bileşen içerdiğinden, Worker'ın kendisi hakkında bilgi edinmek için main.go dosyasında Worker init fonksiyonunun nasıl çağrıldığına kısaca bakmanız yeterlidir. Modül, farklı Worker türlerini de içermektedir, bu yüzden lütfen bunu da göz önünde bulundurun.