Table/Collection-Per-Entity: Finansal Verilerde Enstrüman Bazlı Veri Barındırma Yaklaşımı
Finansal marketlerde her bir alınıp satılabilen varlık, hisse senedi, tahvil, emtia veya BIST 100 gibi gösterge, performans ölçüm amacıyla kullanılan kavramlar kendi içinde ayrı dinamiklere sahip olan varlıklardır ve çoğu zaman yüksek ölçekte veri trafiğine sebep olurlar.
Bahsedeceğim yaklaşım ise bu çeşitliliği ve ölçeklenebilirlik ihtiyacını karşılamak üzere her bir varlığa ait veriyi izole yapılarda tutarak, hem performansı artırmayı, hem I/O trafiğini düşürmeyi hem de bakım süreçlerini sadeleştirmeyi sağlayabiliriz. Bu yöntemle, tek bir büyük veri yapısındaki karmaşıklığı tablo, collection bazında ayırarak yük dengeleme ve arızaları daha izole bir şekilde ele almak gibi iyileştirmelerle sistemimizi daha iyi hale getirebiliriz.
Senaryo
Borsanın endekslerinin ve o borsada işlem gören hisselerin 1. seviyedeki verilerini tutacağımız bir yapıya ihtiyacımız var, bize temelde şu veriler lazım:
- Best Bid/Ask: En iyi alış, satış fiyatları ve büyüklükleri
- Last Trade Price & Size: Son gerçekleşen fiyat ve işlem büyüklüğü
- Last Trade At: En son işlem görme tarihi ve zamanı
Seviye 1 verilerinin hepsini bir yerde tuttuğumuzda karşımıza en iyi ihtimalle yukarıdaki gibi bir görüntü gelecektir.
İyileştirme
Bu tür verileri barındırmak için bunun dışında başka bir seçeneğimizin olması gerekir diye düşündüğümüzde karşımıza table-per-entity, collection-per-entity kavramları çıkıyor.
XU100 endeksi, GARAN hissesi gibi finansal veri akışı olan her bir varlığı kendilerine ait tablolarda, collectionlarda tutmaya başlarsak aşağıdaki gibi bir görüntüyle karşılaşabiliriz.
Artık ortada bir identifier yok. Identifierlar tabloların, collectionların yani finansal varlıkların kendisi olacak.
Bu şekilde veri kaynağında tekrar eden bir veriden(GARAN, XU100 gibi identifier alan) ve enstrüman, finansal varlık bazında sorgulama yapma maliyetinden kurtulmuş oluyoruz, aynı zamanda da varlıklar artık sadece kendilerine ait verileri tutabilecek.
Avantajları
- Filtreleme yapmamız için artık symbol veya başka bir identifier alana ihtiyacımız yok, tablonun veya collection’un kendisi direkt bir varlığa ait.
- Her finansal varlık kendi şemasını tutabilir, bir enstrümana özel alan eklemek, çıkarmak diğer varlıkları etkilemez.
- Her tablo, collection tek varlığa veya varlık grubuna özel olduğu için örnek olarak indeksler daha küçük olabilecek.
- Yazma/okuma izolasyonu daha yönetilebilir, tablo veya collection yazma/okuma lockları ile karşılaşma oranımız artık çok daha düşük.
- Her varlık ayrı shard key ve ayrı shard’lara/cluster segmentlerine dağıtılabilir, çok daha mikro seviyelerde ölçeklendirme ve kaynak ayırma imkanımız var.
- Veri kayıplarında verileri restore etmek veya verileri compact hale getirmek daha kısa sürelerde yapabiliriz ve daha kolay yönetebiliriz.
Mongo
Uygulama —Implementation
Modeller
Bu modeller bize her bir farklı dinamikteki finansal enstrümana ait kayıtlar için genişletilebilir bir şema sunar. IFinancialRecord
tüm kayıtların bir Key
(GARAN, XU100) taşımasını sağlar veBasicFinancialRecord
sadece temel fiyat ve zaman bilgisiniSizedFinancialRecord
bu yapıya işlem büyüklüğünü DetailedFinancialRecord
ise en iyi alış/satış fiyat ve miktar bilgilerini ekleyerek adım adım zenginleşen bir hiyerarşi sunar. Bu sayede her enstrüman türüne ait kendi şemasını oluşturabilir veya varolanları koruyup, genişletilebiliriz.
using System.Text.Json.Serialization;
using MongoDB.Bson.Serialization.Attributes;
namespace PerEntityExample.Models;
public interface IFinancialRecord
{
public string Key { get; }
}
public class BasicFinancialRecord(string identifier, double lastPrice) : IFinancialRecord
{
[BsonIgnore]
public string Key { get; } = identifier;
[BsonElement("price")]
public double LastPrice { get; private set; } = lastPrice;
[BsonElement("lastTradeAt")]
public DateTime LastTradeAt { get; private set; } = DateTime.UtcNow;
}
public class SizedFinancialRecord(string identifier, double lastPrice, double lastSize) : BasicFinancialRecord(identifier, lastPrice)
{
[BsonIgnoreIfNull, BsonElement("lastSize")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public double? LastSize { get; private set; } = lastSize;
}
public class DetailedFinancialRecord(
string identifier,
double lastPrice,
double lastSize,
double ask,
double askSize,
double bid,
double bidSize) : SizedFinancialRecord(identifier, lastPrice, lastSize)
{
[BsonIgnoreIfNull, BsonElement("ask")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public double? Ask { get; private set; } = ask;
[BsonIgnoreIfNull, BsonElement("askSize")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public double? AskSize { get; private set; } = askSize;
[BsonIgnoreIfNull, BsonElement("bid")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public double? Bid { get; private set; } = bid;
[BsonIgnoreIfNull, BsonElement("bidSize")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public double? BidSize { get; private set; } = bidSize;
}
Writerlar
Writerlar her finansal enstrümanı key(GARAN, XU100) bazlı olarak yazmak için tek bir interface kullanmamızı sağlar. Bu interface birden çok varlığın verisini, tekil veya toplu yazmamıza olanak tanır. Böylece yeni bir yazma kaynağı eklemek(pek tercih etmesek de mesela SQL veya tercih edeceğimiz başka bir NoSQL) veya var olanı değiştirmek ayrı ayrı mümkün olur.
using PerEntityExample.Models;
namespace PerEntityExample.Writers;
/// <summary>
/// Key-based writer interface
/// </summary>
public interface IWriter<TMessage> where TMessage : IFinancialRecord
{
/// <summary>
/// Writes multiple groups of messages to their respective destinations, tables, collections in parallel.
/// </summary>
Task WriteAsync(Dictionary<string, List<TMessage>> data, CancellationToken cancellationToken = default);
/// <summary>
/// Writes a batch of messages to the specified destination, table, collection.
/// </summary>
Task WriteAsync(string key, List<TMessage> data, CancellationToken cancellationToken = default);
/// <summary>
/// Writes a single message to the specified destination, table, collection.
/// </summary>
Task WriteAsync(string key, TMessage data, CancellationToken cancellationToken = default);
}
— MongoWriter.cs
using MongoDB.Bson;
using MongoDB.Driver;
using PerEntityExample.Models;
namespace PerEntityExample.Writers;
/// <summary>
/// Key-based mongo writer
/// </summary>
public class MongoWriter<TMessage> : IWriter<TMessage> where TMessage : IFinancialRecord
{
private readonly IMongoDatabase _database;
private readonly Dictionary<string, IMongoCollection<BsonDocument>> _collections = new();
public MongoWriter(string database, string connectionString)
{
var settings = MongoClientSettings.FromUrl(new MongoUrl(connectionString));
settings.MaxConnectionPoolSize = 250;
settings.ConnectTimeout = TimeSpan.FromSeconds(5);
settings.WaitQueueTimeout = TimeSpan.FromSeconds(5);
var mongoClient = new MongoClient(settings);
_database = mongoClient.GetDatabase(database);
}
private readonly InsertManyOptions _insertingOptions = new() { BypassDocumentValidation = true, Comment = null, IsOrdered = false };
private IMongoCollection<BsonDocument> GetCollection(string collectionName)
{
if (_collections.TryGetValue(collectionName, out var collection))
return collection;
collection = _database.GetCollection<BsonDocument>(collectionName);
_collections[collectionName] = collection;
return collection;
}
public async Task WriteAsync(Dictionary<string, List<TMessage>> data, CancellationToken cancellationToken = default)
{
var writeOperations = from groupedTick in data
let collectionName = groupedTick.Key
let collection = GetCollection(collectionName)
let documents = groupedTick.Value.Select(x => x.ToBsonDocument()).ToList()
select new WriteOperation(collection, documents);
var tasks = writeOperations.Select(async op => await op.WriteAsync(_insertingOptions, cancellationToken)).ToArray();
await Task.WhenAll(tasks).ConfigureAwait(false);
}
public async Task WriteAsync(string key, List<TMessage> data, CancellationToken cancellationToken = default)
{
var collection = GetCollection(key);
var documents = data.Select(x => x.ToBsonDocument()).ToList();
await collection.InsertManyAsync(documents, cancellationToken: cancellationToken);
}
public async Task WriteAsync(string key, TMessage data, CancellationToken cancellationToken = default)
{
var collection = GetCollection(key);
var documents = data.ToBsonDocument();
await collection.InsertOneAsync(documents, cancellationToken: cancellationToken);
}
}
public class WriteOperation(IMongoCollection<BsonDocument> collection, IEnumerable<BsonDocument> documents)
{
private IMongoCollection<BsonDocument> Collection { get; } = collection;
private IEnumerable<BsonDocument> Documents { get; } = documents.Select(document =>
{
// remove discriminator
document.Remove("_t");
return document;
});
public async Task WriteAsync(InsertManyOptions options, CancellationToken cancellationToken) =>
await Collection.InsertManyAsync(Documents, options, cancellationToken);
}
— FileWriter.cs
using System.Text.Json;
using PerEntityExample.Models;
namespace PerEntityExample.Writers;
/// <summary>
/// Key-based file writer
/// </summary>
public class FileWriter<TMessage> : IWriter<TMessage> where TMessage : IFinancialRecord
{
private readonly string _baseDirectory;
public FileWriter(string baseDirectory)
{
_baseDirectory = baseDirectory;
var baseDirectoryExist = Path.Exists(_baseDirectory);
if (baseDirectoryExist is false)
Directory.CreateDirectory(_baseDirectory);
}
public async Task WriteAsync(Dictionary<string, List<TMessage>> data, CancellationToken cancellationToken = default)
{
var tasks = data.Select(kvp => WriteAsync(kvp.Key, kvp.Value, cancellationToken));
await Task.WhenAll(tasks).ConfigureAwait(false);
}
public async Task WriteAsync(string key, List<TMessage> data, CancellationToken cancellationToken = default)
{
var filePath = Path.Combine(_baseDirectory, $"{key}.json");
await using var stream = new FileStream(
filePath,
FileMode.Append,
FileAccess.Write,
FileShare.Read,
bufferSize: 4096,
useAsync: true);
await using var writer = new StreamWriter(stream);
foreach (var item in data)
{
var json = JsonSerializer.Serialize(item, item.GetType());
var jsonAsMemory = json.AsMemory();
await writer.WriteLineAsync(jsonAsMemory, cancellationToken);
}
}
public async Task WriteAsync(string key, TMessage data, CancellationToken cancellationToken = default)
{
await WriteAsync(key, [data], cancellationToken).ConfigureAwait(false);
}
}
Program.cs
using PerEntityExample.Models;
using PerEntityExample.Writers;
var indexes = new[] { "XU30", "XU100", "XU500" };
var stocks = new[] { "EKGYO", "GARAN", "ISMEN" };
const string target = "financialRecords";
var writers = new List<IWriter<IFinancialRecord>>
{
new FileWriter<IFinancialRecord>(target),
new MongoWriter<IFinancialRecord>(target, "mongodb://localhost:27017")
};
const int batchSize = 100;
while (true)
{
var buffer = new List<IFinancialRecord>(batchSize);
var recordStreaming = GetFinancialRecordStreamingAsync();
await foreach (var record in recordStreaming)
{
buffer.Add(record);
if (buffer.Count >= batchSize)
{
var groupedBuffer = buffer
.GroupBy(r => r.Key)
.ToDictionary(g => g.Key, g => g.ToList());
var writeTasks = writers.Select(writer => writer.WriteAsync(groupedBuffer)).ToArray();
try
{
await Task.WhenAll(writeTasks).ConfigureAwait(false);
}
catch (Exception e)
{
Console.WriteLine($"Error writing records: {e.Message}");
}
finally
{
buffer.Clear();
groupedBuffer.Clear();
}
}
}
}
async IAsyncEnumerable<IFinancialRecord> GetFinancialRecordStreamingAsync()
{
var random = new Random();
var prices = new Dictionary<string, double>();
foreach (var key in indexes.Concat(stocks))
prices[key] = 10 + random.NextDouble() * 10;
while (true)
{
string key;
var isBasic = false;
if (random.Next(2) == 0)
{
key = stocks[random.Next(stocks.Length)];
}
else
{
key = indexes[random.Next(indexes.Length)];
isBasic = true;
}
var prevPrice = prices[key];
var delta = random.NextDouble() * 0.2 - 0.1;
var price = Math.Max(0, prevPrice + delta);
prices[key] = price;
if (isBasic)
{
yield return new BasicFinancialRecord(key, price);
}
else
{
var size = random.Next(1, 500);
var ask = price + random.NextDouble() * 0.1;
var askSize = random.Next(1, 500);
var bid = price - random.NextDouble() * 0.1;
var bidSize = random.Next(1, 500);
yield return new DetailedFinancialRecord(key, price, size, ask, askSize, bid, bidSize);
}
await Task.Delay(50);
}
}
Sonuç
Bu yaklaşım ile her bir finansal enstrümanın verisi izole edilmiş yapılarda saklayarak yüksek veri hacimlerinde ölçeklenebilirlik elde edebiliriz. Tablo/Collection-per-entity yaklaşımı şema esnekliği, mikro seviyede kaynak ayrımı ve bakım kolaylığı sunar. Sonuç olarak kendi içlerinde farklı dinamiklere sahip finansal veriler için yönetilebilirlik ve genişletilebilirlik ihtiyaçlarını dengeli şekilde karşılayan, sürdürülebilir bir çözüm elde etmiş oluyoruz.