Web Sunucusunun Aldığı İstekleri Ne Kadar Sürede İşlediğini Kafka ve Go ile Kaydetme
ASP.NET ile çalışan bir WebAPI serverinin aldığı istekleri ne kadar sürede işlediğini ActionFilter ve Middleware kullanarak tutacağız, ve bu kayıtları hem Kafka’ya yollayıp hem de bir dosyaya yazarak kayıt altına alacağız. Kafka’ya yollanmış mesajları ise Go’da yazılmış bir program yardımı ile okuyup, database’e aktaracağız. En sonundaysa bu kayıtları bir dashboard’da görüntüleyeceğiz.
ASP.NET WebAPI projesi
Web API’mızın iki tane end-point route’u olacak;
- /api/products
GET, POST, PUT, DELETE metotları ile istekler alabilecek ve dummy data oluşturmak için kullanılacak. Geri dönüşleri sadece 204 olacak ve içerisinde logic bir işlem yapılmayacak fakat bu metotlara giden istekler, 0 ile 3000 ms arası bekletilecek. - /health/api/products
Son bir saatte /api/products’a giden isteklerin ne kadar sürede işlendiğinin bilgisini database’den alıp, sunacak.
ASP.NET WebAPI projesinin oluşturulması
dotnet new webapi -o server -n Kafka.Example
İhtiyacımız olan paketlerin eklenmesi
cd server
dotnet add package Confluent.Kafka
dotnet add package Npgsql
- Npgsql
Postgres database için kullanacağımız kütüphane. - Confluent.Kafka
Kafka serverımız ile iletişim kurarken, Produce işlemleri için kullanacağımız kütüphane.
ResponseLog.cs
Database’den response sonuçlarını almak istediğimiz zaman kullandığımız model
Database’den gelen bilgiler ile end-point üzerinden sunulacak objelerin oluşturulmasını sağlaması için bir constructor’ı var.
Services
- FileLogService
Gelen mesajı bir dosyaya kaydetmek ve kaydettiği bilgiyi, konsolda bildirmek. - KafkaLogService
Gelen mesajı belirlenen bir topic’e göndermek(Produce etmek) ve gönderdiği bilgiyi konsolda bildirmek.
Bu her iki servis başka bir abstraction sınıftan kalıtım alıyor; LogService.
LogService ise, bu iki servisin ortak olarak kullanacağı ihtiyaçları belirliyor.
LogService.cs
Services/Abstract dizininin altına, LogService adında bir abstract class oluşturalım.
FileLogService.cs
Services dizinin altına, FileLogService adında LogService’den kalıtım alan bir sınıf oluşturalım.
SendAsync metotunun FileLogService için yaptığı işlem; logFilePath
variable’ına göre, ResponseLog tarafından gelen mesajın bir dosyaya kaydedilmesi.
KafkaLogService.cs
SendAsync metotunun KafkaLogService için yaptığı işlem; ResponseLog tarafından gelen mesajın sabit olarak belirlenen topic
’e gönderilmesi.
ActionFilters
- Delayer
Delayer attribute’unu alan end-pointler’in rastgele bir değer aralığında bekletilmesini sağlamak için var. - TimeTracker
Yaptığı tek bir iş var, oda TimeTracker attribute’unu alan bir yere istek giderse, isteğin end-point’e ilk ulaştığı süreyi HttpContext.Items dictionarysi içinde tutmak.
Delayer.cs
TimeTracker.cs
Routes
Yukarıda bahsettiğim gibi iki farklı routeumuz olacak ve bunların biri dummy data oluşturmak için kullanılacak. Diğeriyse bu dummy datayı sunmak için kullanılacak.
Products
Controllers altına, Products adında bir Controller oluşturalım.
Önceden yazdığımız TimeTracker ve Delayer ActionFilter’larını bu Controller’ın bütün metotlarında geçerli olacak şekilde belirttim.
Artık bu Controller altındaki herhangi bir end-point’e gelen request’in ne zaman ulaştığı tutulacak ve 0 ile 3 saniye arasında bekletilecek.
Health
Controllers altına, Health adında bir Controller oluşturalım.
Kafka’dan gelen mesajları aktardığımız database’e Npgsql kütüphanesi ile basit bir select sorgusu yapıyoruz ve bu sorguyu bir saat önceye kadar oluşturulmuş verilere göre kısıtlayıp, reader ile listeye aktarıp listede bir değer varsa listeyi dönüyoruz, yoksa 204 ile dönüşü sağlıyoruz.
Startup.cs
Yazdığımız servisleri DI ile eklemek, CORS’u belirtmek, kendi yazdığımız bir Middleware’ı kullanmak ve Dashboard ile kullanacağımız static ve default dosyaları kullanmasını belirtmemiz için Starup.cs dosyasına bi’ uğramamız gerekiyor.
Burada yaptığım işlemler(DI ve CORS) çok temel fakat çok iyi öğrenilmesi gereken bilgiler/yaklaşımlardır. Bunun sorumluluğunu ben almak istemiyorum ama sizi öğrenirken kullandığım kaynaklardan bir kaçına yönlendirebilirim.
- Wikipedia DI
- Microsoft DI
- Abdullah Uğraşkan DI — Uzun ve eğlenceli bir anlatımı olan YouTube videosu
- Halil Ibrahim Kalkan Best Practices DI
- Mozilla CORS
- Microsoft CORS
Bunlar dışındaysa, yazdığımız ve birazdan göreceğiniz Middleware’ı kullanmasını söylüyoruz.
ResponseLoggerMiddleware
Tam olarak her şeyin tetikleneceği yerdeyiz,
bu middleware gelen istekleri karşılıyor ve isteğin gittiği yerde TimeTracker attribute’u varsa, timeBeforeProcessEndpoint değeri default olmayan bir değer olacak.
timeBeforeProcessEndpoint değeri default değilse, o anı Unix ms olarak timeAfterProcessEndpoint variable’ına atıyoruz ve sonrasındaysa,
timeBeforeProcessEndpoint — timeAfterProcessEndpoint ile elapsedTime’ı elde ediyoruz. Bu sayede isteğin end-point içinde ne kadar sürede işlendiğine ve ne kadar sürede isteğe bir karşılık vermeye başladığımızı elde ediyoruz.
İsteği alan metot adını, geçen süreyi ve o anki zamanı alarak bir string oluşturuyoruz ve bu stringi KafkaLog ve FileLog’un SendAsync metotlarına yolluyoruz.
Sonrasındaysa, Kafka producer Kafka’ya mesajını yolluyor, FileLog ise bir stream üzerinden dosyaya yazıyor.
Go ile Kafka mesajlarını okumak ve Database’ın güncellenmesi
ASP.NET WebAPI tarafından gönderdiğimiz Kafka mesajlarının consume edilmesi(okunması) ve database’e yazılmasına ihtiyacımız var. Bunun için ise basit bir programı Go ile yazdım.
İlk önce tanımlanmış global değişkenlere göz atalım,
maxMessageCountToAccumulate, bu variable ile Kafka’dan N kadar mesajı okuduktan sonra bir transaction başlatıp, bu mesajları database’e aktarmasını sağlıyoruz. Aynı şekilde bu const değer, Kafka mesajlarının toplanacağı kafkaMessages’ın length’ını de belirlemek için kullanılıyor.
insertStatement, klasik database insert cümleciği. Database ve Kafka server ile alakalı diğer sabit değişkenler ise gayet açık.
receivedMessageCount ise, Kafka’dan consume ettiğimiz mesaj sayısını tutuyor. Her transaction sonlandığında bu değer tekrardan sıfırlanıyor. Database transaction’unun başlamasını tetikleyen kontrol maxMessageCounToAccumulate ile receivedMessageCount arasında yapılıyor.
initDatabase
net_logs adında bir tablo yoksa, belirtilen özellikler ile bir tablo oluşturuyor.
insertMessages
kafkaMessages içindeki her mesajı split ederek transaction’a insert cümleciği olarak ekliyor ve en sonundaysa receivedMessageCount’u sıfırlayıp, transaction’u commit ediyor.
consumeAndInsertMessages
Kafka’dan gelen mesajları, consume edip onları kafkaMessages içine alarak print ediyor. Eğer receivedMessageCount eşit veya büyükse maxMessageCountToAccumulate’den, insertMessages fonksiyonunu çağırıyor.
main
Bir database bağlantısı oluşturup, onu açıyor. Ardından her ihtimale karşı bir süre bekleyip, Kafka server için yeni bir consumer oluşturuyor ve sonraysa sonsuz bir döngü ile consume etmeye başlıyor. Eğer consume veya database insert işlemlerinde hata oluşursa bunu konsola verip, tekrardan sonsuz bir döngü ile consume etmeye devam ediyor.
Dashboard
Aslında Dashboard ile ilgili aktarabileceğim pek bir şey yok. Çünkü JavaScript hakim olduğum bir dil değil. Yaptıklarımı anlatabilirim ama yaptıklarımı olması gerektiği yollarla yaptığımı düşünmediğim için, Dashboard tarafındaki JavaScript kodlarından bahsedip, herhangi birini yanlış yönlendirmek istemiyorum. Fakat siz yine de inceleyebilirsiniz: Dashboard.
En sonunda elde ettiğimiz
Bu projede görebildiğim eksikler;
- Yapmak istediğimiz işlem için ilişkisel bir veritabanını kullanmak gereksiz, NoSQL bir database tercih edilmesi daha iyi olurdu.
- Topic bir sabit olarak tutuluyor, ResponseLog ile hangi topic için işlem yapılacağı belirtilebilir.
ResponseLog ile gelecek topic bilgisi ile hangi end-point route’undan geldiği bilgisi belirtilebilir ve ona ait bir dosyaya veya ona ait bir kafka topic’ine bu response log aktarılır.
Örnek; şuanda sadece tek bir route’u izliyoruz ve bu route(/api/products). Herhangi yeni bir route’u izlemek istediğimiz zaman, bunun loglarını aynı topicte ve aynı dosyada tutmamız gerekecek, bunu yapabiliriz ama artık verilerimiz anlamlı bir veri olmaktan çıkar. - Aynı topicte tutarak, anlamlı/ayırt edici verileri nasıl elde edebiliriz?
Farklı routelardan gelen ResponseLog’u aynı topicte tutarak ayırt edici hale getirebiliriz. Bunun için, ResponseLog’a yeni bir property ekleriz ve bu property, route bilgisini tutar ve artık ResponseLog mesajını şu hale getirebiliriz;/API/PRODUCTS GET 156 1628171244941
Bu değişiklikler sonrası, database tablomuza route’un ayırt edilmesi için bir alan daha eklemeliyiz ve Go ile yazdığımız consumer ve database güncelliyici buna göre şekillenmelidir. Aynı zamanda Health controller’ı ile sunulan verilerinde dışarıdan bir parametre ile filtrelenebilir olması gereklidir.
/API/USERS PUT 50 1628171244941 - Dosyaya yazma işleminin Web Server tarafından yapılması yerine, consumer tarafından hem database’e insert etme hem de dosyaya yazma işlemi yapılabilirdi.
- Dashboard’a veri sunuşunun optimize edilmesi gerekiyor, server tarafından sadece son bir saatte olan requestleri filtreleyip sunuyoruz ve Dashboard tarafında ise her seferinde son bir saatlik verileri gruplayıp onları chart’a veriyoruz. Çok fazla istek alan bir server için pek efektif bir yol değil.
Eğer Dashboard ilk defa veri alıyorsa son bir saatlik verileri alarak, sonraki istekler içinse en son isteğin yapıldığı süreden sonraki kayıtların getirilmesi istenerek chart sadece yeni gelen verilerle birlikte güncellenerek, bu sorun çözülebilir.