Web Sunucusunun Aldığı İstekleri Ne Kadar Sürede İşlediğini Kafka ve Go ile Kaydetme

Halil İbrahim Kocaöz
6 min readAug 6, 2021

--

ASP.NET ile çalışan bir WebAPI serverinin aldığı istekleri ne kadar sürede işlediğini ActionFilter ve Middleware kullanarak tutacağız, ve bu kayıtları hem Kafka’ya yollayıp hem de bir dosyaya yazarak kayıt altına alacağız. Kafka’ya yollanmış mesajları ise Go’da yazılmış bir program yardımı ile okuyup, database’e aktaracağız. En sonundaysa bu kayıtları bir dashboard’da görüntüleyeceğiz.

ASP.NET WebAPI projesi

Web API’mızın iki tane end-point route’u olacak;

  • /api/products
    GET, POST, PUT, DELETE metotları ile istekler alabilecek ve dummy data oluşturmak için kullanılacak. Geri dönüşleri sadece 204 olacak ve içerisinde logic bir işlem yapılmayacak fakat bu metotlara giden istekler, 0 ile 3000 ms arası bekletilecek.
  • /health/api/products
    Son bir saatte /api/products’a giden isteklerin ne kadar sürede işlendiğinin bilgisini database’den alıp, sunacak.

ASP.NET WebAPI projesinin oluşturulması

dotnet new webapi -o server -n Kafka.Example

İhtiyacımız olan paketlerin eklenmesi

cd server
dotnet add package Confluent.Kafka
dotnet add package Npgsql

  • Npgsql
    Postgres database için kullanacağımız kütüphane.
  • Confluent.Kafka
    Kafka serverımız ile iletişim kurarken, Produce işlemleri için kullanacağımız kütüphane.

ResponseLog.cs

Database’den response sonuçlarını almak istediğimiz zaman kullandığımız model

Models/ResponseLog.cs

Database’den gelen bilgiler ile end-point üzerinden sunulacak objelerin oluşturulmasını sağlaması için bir constructor’ı var.

Services

  • FileLogService
    Gelen mesajı bir dosyaya kaydetmek ve kaydettiği bilgiyi, konsolda bildirmek.
  • KafkaLogService
    Gelen mesajı belirlenen bir topic’e göndermek(Produce etmek) ve gönderdiği bilgiyi konsolda bildirmek.

Bu her iki servis başka bir abstraction sınıftan kalıtım alıyor; LogService.
LogService ise, bu iki servisin ortak olarak kullanacağı ihtiyaçları belirliyor.

LogService.cs

Services/Abstract dizininin altına, LogService adında bir abstract class oluşturalım.

Services/Abstract/LogService.cs

FileLogService.cs

Services dizinin altına, FileLogService adında LogService’den kalıtım alan bir sınıf oluşturalım.

Services/FileLogService.cs

SendAsync metotunun FileLogService için yaptığı işlem; logFilePath variable’ına göre, ResponseLog tarafından gelen mesajın bir dosyaya kaydedilmesi.

KafkaLogService.cs

Services/KafkaLogService.cs

SendAsync metotunun KafkaLogService için yaptığı işlem; ResponseLog tarafından gelen mesajın sabit olarak belirlenen topic’e gönderilmesi.

ActionFilters

  • Delayer
    Delayer attribute’unu alan end-pointler’in rastgele bir değer aralığında bekletilmesini sağlamak için var.
  • TimeTracker
    Yaptığı tek bir iş var, oda TimeTracker attribute’unu alan bir yere istek giderse, isteğin end-point’e ilk ulaştığı süreyi HttpContext.Items dictionarysi içinde tutmak.

Delayer.cs

Filters/Delayer.cs

TimeTracker.cs

Filters/TimeTracker.cs

Routes

Yukarıda bahsettiğim gibi iki farklı routeumuz olacak ve bunların biri dummy data oluşturmak için kullanılacak. Diğeriyse bu dummy datayı sunmak için kullanılacak.

Products

Controllers altına, Products adında bir Controller oluşturalım.

Controllers/ProductsController.cs

Önceden yazdığımız TimeTracker ve Delayer ActionFilter’larını bu Controller’ın bütün metotlarında geçerli olacak şekilde belirttim.

Artık bu Controller altındaki herhangi bir end-point’e gelen request’in ne zaman ulaştığı tutulacak ve 0 ile 3 saniye arasında bekletilecek.

Health

Controllers altına, Health adında bir Controller oluşturalım.

Controllers/HealthController.cs

Kafka’dan gelen mesajları aktardığımız database’e Npgsql kütüphanesi ile basit bir select sorgusu yapıyoruz ve bu sorguyu bir saat önceye kadar oluşturulmuş verilere göre kısıtlayıp, reader ile listeye aktarıp listede bir değer varsa listeyi dönüyoruz, yoksa 204 ile dönüşü sağlıyoruz.

Startup.cs

Yazdığımız servisleri DI ile eklemek, CORS’u belirtmek, kendi yazdığımız bir Middleware’ı kullanmak ve Dashboard ile kullanacağımız static ve default dosyaları kullanmasını belirtmemiz için Starup.cs dosyasına bi’ uğramamız gerekiyor.

Burada yaptığım işlemler(DI ve CORS) çok temel fakat çok iyi öğrenilmesi gereken bilgiler/yaklaşımlardır. Bunun sorumluluğunu ben almak istemiyorum ama sizi öğrenirken kullandığım kaynaklardan bir kaçına yönlendirebilirim.

Bunlar dışındaysa, yazdığımız ve birazdan göreceğiniz Middleware’ı kullanmasını söylüyoruz.

ResponseLoggerMiddleware

Tam olarak her şeyin tetikleneceği yerdeyiz,
bu middleware gelen istekleri karşılıyor ve isteğin gittiği yerde TimeTracker attribute’u varsa, timeBeforeProcessEndpoint değeri default olmayan bir değer olacak.

timeBeforeProcessEndpoint değeri default değilse, o anı Unix ms olarak timeAfterProcessEndpoint variable’ına atıyoruz ve sonrasındaysa,

timeBeforeProcessEndpointtimeAfterProcessEndpoint ile elapsedTime’ı elde ediyoruz. Bu sayede isteğin end-point içinde ne kadar sürede işlendiğine ve ne kadar sürede isteğe bir karşılık vermeye başladığımızı elde ediyoruz.

İsteği alan metot adını, geçen süreyi ve o anki zamanı alarak bir string oluşturuyoruz ve bu stringi KafkaLog ve FileLog’un SendAsync metotlarına yolluyoruz.

Sonrasındaysa, Kafka producer Kafka’ya mesajını yolluyor, FileLog ise bir stream üzerinden dosyaya yazıyor.

Go ile Kafka mesajlarını okumak ve Database’ın güncellenmesi

ASP.NET WebAPI tarafından gönderdiğimiz Kafka mesajlarının consume edilmesi(okunması) ve database’e yazılmasına ihtiyacımız var. Bunun için ise basit bir programı Go ile yazdım.

İlk önce tanımlanmış global değişkenlere göz atalım,

maxMessageCountToAccumulate, bu variable ile Kafka’dan N kadar mesajı okuduktan sonra bir transaction başlatıp, bu mesajları database’e aktarmasını sağlıyoruz. Aynı şekilde bu const değer, Kafka mesajlarının toplanacağı kafkaMessages’ın length’ını de belirlemek için kullanılıyor.

insertStatement, klasik database insert cümleciği. Database ve Kafka server ile alakalı diğer sabit değişkenler ise gayet açık.

receivedMessageCount ise, Kafka’dan consume ettiğimiz mesaj sayısını tutuyor. Her transaction sonlandığında bu değer tekrardan sıfırlanıyor. Database transaction’unun başlamasını tetikleyen kontrol maxMessageCounToAccumulate ile receivedMessageCount arasında yapılıyor.

initDatabase

net_logs adında bir tablo yoksa, belirtilen özellikler ile bir tablo oluşturuyor.

insertMessages

kafkaMessages içindeki her mesajı split ederek transaction’a insert cümleciği olarak ekliyor ve en sonundaysa receivedMessageCount’u sıfırlayıp, transaction’u commit ediyor.

consumeAndInsertMessages

Kafka’dan gelen mesajları, consume edip onları kafkaMessages içine alarak print ediyor. Eğer receivedMessageCount eşit veya büyükse maxMessageCountToAccumulate’den, insertMessages fonksiyonunu çağırıyor.

main

Bir database bağlantısı oluşturup, onu açıyor. Ardından her ihtimale karşı bir süre bekleyip, Kafka server için yeni bir consumer oluşturuyor ve sonraysa sonsuz bir döngü ile consume etmeye başlıyor. Eğer consume veya database insert işlemlerinde hata oluşursa bunu konsola verip, tekrardan sonsuz bir döngü ile consume etmeye devam ediyor.

Dashboard

Aslında Dashboard ile ilgili aktarabileceğim pek bir şey yok. Çünkü JavaScript hakim olduğum bir dil değil. Yaptıklarımı anlatabilirim ama yaptıklarımı olması gerektiği yollarla yaptığımı düşünmediğim için, Dashboard tarafındaki JavaScript kodlarından bahsedip, herhangi birini yanlış yönlendirmek istemiyorum. Fakat siz yine de inceleyebilirsiniz: Dashboard.

En sonunda elde ettiğimiz

Bu projede görebildiğim eksikler;

  • Yapmak istediğimiz işlem için ilişkisel bir veritabanını kullanmak gereksiz, NoSQL bir database tercih edilmesi daha iyi olurdu.
  • Topic bir sabit olarak tutuluyor, ResponseLog ile hangi topic için işlem yapılacağı belirtilebilir.
    ResponseLog ile gelecek topic bilgisi ile hangi end-point route’undan geldiği bilgisi belirtilebilir ve ona ait bir dosyaya veya ona ait bir kafka topic’ine bu response log aktarılır.
    Örnek; şuanda sadece tek bir route’u izliyoruz ve bu route(/api/products). Herhangi yeni bir route’u izlemek istediğimiz zaman, bunun loglarını aynı topicte ve aynı dosyada tutmamız gerekecek, bunu yapabiliriz ama artık verilerimiz anlamlı bir veri olmaktan çıkar.
  • Aynı topicte tutarak, anlamlı/ayırt edici verileri nasıl elde edebiliriz?
    Farklı routelardan gelen ResponseLog’u aynı topicte tutarak ayırt edici hale getirebiliriz. Bunun için, ResponseLog’a yeni bir property ekleriz ve bu property, route bilgisini tutar ve artık ResponseLog mesajını şu hale getirebiliriz;
    /API/PRODUCTS GET 156 1628171244941
    /API/USERS PUT 50 1628171244941
    Bu değişiklikler sonrası, database tablomuza route’un ayırt edilmesi için bir alan daha eklemeliyiz ve Go ile yazdığımız consumer ve database güncelliyici buna göre şekillenmelidir. Aynı zamanda Health controller’ı ile sunulan verilerinde dışarıdan bir parametre ile filtrelenebilir olması gereklidir.
  • Dosyaya yazma işleminin Web Server tarafından yapılması yerine, consumer tarafından hem database’e insert etme hem de dosyaya yazma işlemi yapılabilirdi.
  • Dashboard’a veri sunuşunun optimize edilmesi gerekiyor, server tarafından sadece son bir saatte olan requestleri filtreleyip sunuyoruz ve Dashboard tarafında ise her seferinde son bir saatlik verileri gruplayıp onları chart’a veriyoruz. Çok fazla istek alan bir server için pek efektif bir yol değil.
    Eğer Dashboard ilk defa veri alıyorsa son bir saatlik verileri alarak, sonraki istekler içinse en son isteğin yapıldığı süreden sonraki kayıtların getirilmesi istenerek chart sadece yeni gelen verilerle birlikte güncellenerek, bu sorun çözülebilir.

--

--