Files
Fishing2Server/Fantasy/Fantays.Console/Runtime/Core/DataBase/MongoDataBase.cs
2025-06-30 10:51:37 +08:00

1081 lines
46 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#if FANTASY_NET
using System.Linq.Expressions;
using Fantasy.Async;
using Fantasy.DataStructure.Collection;
using Fantasy.Entitas;
using Fantasy.Helper;
using Fantasy.Serialize;
using MongoDB.Bson;
using MongoDB.Driver;
#pragma warning disable CS8602 // Dereference of a possibly null reference.
#pragma warning disable CS8603 // Possible null reference return.
#pragma warning disable CS8625 // Cannot convert null literal to non-nullable reference type.
#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
namespace Fantasy.DataBase
{
/// <summary>
/// 使用 MongoDB 数据库的实现。
/// </summary>
public sealed class MongoDataBase : IDataBase
{
private const int DefaultTaskSize = 1024;
private Scene _scene;
private MongoClient _mongoClient;
private ISerialize _serializer;
private IMongoDatabase _mongoDatabase;
private CoroutineLock _dataBaseLock;
private readonly HashSet<string> _collections = new HashSet<string>();
/// <summary>
/// 获得当前数据的类型
/// </summary>
public DataBaseType GetDataBaseType { get; } = DataBaseType.MongoDB;
/// <summary>
/// 获得对应数据的操作实例
/// </summary>
public object GetDataBaseInstance => _mongoDatabase;
/// <summary>
/// 初始化 MongoDB 数据库连接并记录所有集合名。
/// </summary>
/// <param name="scene">场景对象。</param>
/// <param name="connectionString">数据库连接字符串。</param>
/// <param name="dbName">数据库名称。</param>
/// <returns>初始化后的数据库实例。</returns>
public IDataBase Initialize(Scene scene, string connectionString, string dbName)
{
_scene = scene;
_mongoClient = DataBaseSetting.MongoDBCustomInitialize != null
? DataBaseSetting.MongoDBCustomInitialize(new DataBaseCustomConfig()
{
Scene = scene, ConnectionString = connectionString, DBName = dbName
})
: new MongoClient(connectionString);
_mongoDatabase = _mongoClient.GetDatabase(dbName);
_dataBaseLock = scene.CoroutineLockComponent.Create(GetType().TypeHandle.Value.ToInt64());
// 记录所有集合名
_collections.UnionWith(_mongoDatabase.ListCollectionNames().ToList());
_serializer = SerializerManager.GetSerializer(FantasySerializerType.Bson);
return this;
}
/// <summary>
/// 销毁释放资源。
/// </summary>
public void Dispose()
{
// 优先释放协程锁。
_dataBaseLock.Dispose();
// 清理资源。
_scene = null;
_serializer = null;
_mongoDatabase = null;
_dataBaseLock = null;
_collections.Clear();
_mongoClient.Dispose();
}
#region Other
/// <summary>
/// 对满足条件的文档中的某个数值字段进行求和操作。
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="filter">用于筛选文档的表达式。</param>
/// <param name="sumExpression">要对其进行求和的字段表达式。</param>
/// <param name="collection">集合名称,可选。如果未指定,将使用实体类型的名称。</param>
/// <returns>满足条件的文档中指定字段的求和结果。</returns>
public async FTask<long> Sum<T>(Expression<Func<T, bool>> filter, Expression<Func<T, object>> sumExpression, string collection = null) where T : Entity
{
var member = (MemberExpression)((UnaryExpression)sumExpression.Body).Operand;
var projection = new BsonDocument("_id", "null").Add("Result", new BsonDocument("$sum", $"${member.Member.Name}"));
var data = await GetCollection<T>(collection).Aggregate().Match(filter).Group(projection).FirstOrDefaultAsync();
return data == null ? 0 : Convert.ToInt64(data["Result"]);
}
#endregion
#region GetCollection
/// <summary>
/// 获取指定集合中的 MongoDB 文档的 IMongoCollection 对象。
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="collection">集合名称,可选。如果未指定,将使用实体类型的名称。</param>
/// <returns>IMongoCollection 对象。</returns>
private IMongoCollection<T> GetCollection<T>(string collection = null)
{
return _mongoDatabase.GetCollection<T>(collection ?? typeof(T).Name);
}
/// <summary>
/// 获取指定集合中的 MongoDB 文档的 IMongoCollection 对象,其中实体类型为 Entity。
/// </summary>
/// <param name="name">集合名称。</param>
/// <returns>IMongoCollection 对象。</returns>
private IMongoCollection<Entity> GetCollection(string name)
{
return _mongoDatabase.GetCollection<Entity>(name);
}
#endregion
#region Count
/// <summary>
/// 统计指定集合中满足条件的文档数量。
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="collection">集合名称,可选。如果未指定,将使用实体类型的名称。</param>
/// <returns>满足条件的文档数量。</returns>
public async FTask<long> Count<T>(string collection = null) where T : Entity
{
return await GetCollection<T>(collection).CountDocumentsAsync(d => true);
}
/// <summary>
/// 统计指定集合中满足条件的文档数量。
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="filter">用于筛选文档的表达式。</param>
/// <param name="collection">集合名称,可选。如果未指定,将使用实体类型的名称。</param>
/// <returns>满足条件的文档数量。</returns>
public async FTask<long> Count<T>(Expression<Func<T, bool>> filter, string collection = null) where T : Entity
{
return await GetCollection<T>(collection).CountDocumentsAsync(filter);
}
#endregion
#region Exist
/// <summary>
/// 判断指定集合中是否存在文档。
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="collection">集合名称,可选。如果未指定,将使用实体类型的名称。</param>
/// <returns>如果存在文档则返回 true否则返回 false。</returns>
public async FTask<bool> Exist<T>(string collection = null) where T : Entity
{
return await Count<T>(collection) > 0;
}
/// <summary>
/// 判断指定集合中是否存在满足条件的文档。
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="filter">用于筛选文档的表达式。</param>
/// <param name="collection">集合名称,可选。如果未指定,将使用实体类型的名称。</param>
/// <returns>如果存在满足条件的文档则返回 true否则返回 false。</returns>
public async FTask<bool> Exist<T>(Expression<Func<T, bool>> filter, string collection = null) where T : Entity
{
return await Count(filter, collection) > 0;
}
#endregion
#region Query
/// <summary>
/// 在不加数据库锁定的情况下,查询指定 ID 的文档。
/// </summary>
/// <typeparam name="T">文档实体类型。</typeparam>
/// <param name="id">要查询的文档 ID。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <param name="collection">集合名称。</param>
/// <returns>查询到的文档。</returns>
public async FTask<T> QueryNotLock<T>(long id, bool isDeserialize = false, string collection = null) where T : Entity
{
var cursor = await GetCollection<T>(collection).FindAsync(d => d.Id == id);
var v = await cursor.FirstOrDefaultAsync();
if (isDeserialize && v != null)
{
v.Deserialize(_scene);
}
return v;
}
/// <summary>
/// 查询指定 ID 的文档,并加数据库锁定以确保数据一致性。
/// </summary>
/// <typeparam name="T">文档实体类型。</typeparam>
/// <param name="id">要查询的文档 ID。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <param name="collection">集合名称。</param>
/// <returns>查询到的文档。</returns>
public async FTask<T> Query<T>(long id, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(id))
{
var cursor = await GetCollection<T>(collection).FindAsync(d => d.Id == id);
var v = await cursor.FirstOrDefaultAsync();
if (isDeserialize && v != null)
{
v.Deserialize(_scene);
}
return v;
}
}
/// <summary>
/// 通过分页查询并返回满足条件的文档数量和日期列表(不加锁)。
/// </summary>
/// <typeparam name="T">文档实体类型。</typeparam>
/// <param name="filter">查询过滤条件。</param>
/// <param name="pageIndex">页码。</param>
/// <param name="pageSize">每页大小。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <param name="collection">集合名称。</param>
/// <returns>满足条件的文档数量和日期列表。</returns>
public async FTask<(int count, List<T> dates)> QueryCountAndDatesByPage<T>(Expression<Func<T, bool>> filter, int pageIndex, int pageSize, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var count = await Count(filter);
var dates = await QueryByPage(filter, pageIndex, pageSize, isDeserialize, collection);
return ((int)count, dates);
}
}
/// <summary>
/// 通过分页查询并返回满足条件的文档数量和日期列表(加锁)。
/// </summary>
/// <typeparam name="T">文档实体类型。</typeparam>
/// <param name="filter">查询过滤条件。</param>
/// <param name="pageIndex">页码。</param>
/// <param name="pageSize">每页大小。</param>
/// <param name="cols">要查询的列名称数组。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <param name="collection">集合名称。</param>
/// <returns>满足条件的文档数量和日期列表。</returns>
public async FTask<(int count, List<T> dates)> QueryCountAndDatesByPage<T>(Expression<Func<T, bool>> filter, int pageIndex, int pageSize, string[] cols, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var count = await Count(filter);
var dates = await QueryByPage(filter, pageIndex, pageSize, cols, isDeserialize, collection);
return ((int)count, dates);
}
}
/// <summary>
/// 通过分页查询并返回满足条件的文档列表(不加锁)。
/// </summary>
/// <typeparam name="T">文档实体类型。</typeparam>
/// <param name="filter">查询过滤条件。</param>
/// <param name="pageIndex">页码。</param>
/// <param name="pageSize">每页大小。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <param name="collection">集合名称。</param>
/// <returns>满足条件的文档列表。</returns>
public async FTask<List<T>> QueryByPage<T>(Expression<Func<T, bool>> filter, int pageIndex, int pageSize, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var list = await GetCollection<T>(collection).Find(filter).Skip((pageIndex - 1) * pageSize)
.Limit(pageSize)
.ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
/// <summary>
/// 通过分页查询并返回满足条件的文档列表(加锁)。
/// </summary>
/// <typeparam name="T">文档实体类型。</typeparam>
/// <param name="filter">查询过滤条件。</param>
/// <param name="pageIndex">页码。</param>
/// <param name="pageSize">每页大小。</param>
/// <param name="cols">要查询的列名称数组。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <param name="collection">集合名称。</param>
/// <returns>满足条件的文档列表。</returns>
public async FTask<List<T>> QueryByPage<T>(Expression<Func<T, bool>> filter, int pageIndex, int pageSize, string[] cols, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var projection = Builders<T>.Projection.Include("");
foreach (var col in cols)
{
projection = projection.Include(col);
}
var list = await GetCollection<T>(collection).Find(filter).Project<T>(projection)
.Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
/// <summary>
/// 通过分页查询并返回满足条件的文档列表,并按指定表达式进行排序(加锁)。
/// </summary>
/// <typeparam name="T">文档实体类型。</typeparam>
/// <param name="filter">查询过滤条件。</param>
/// <param name="pageIndex">页码。</param>
/// <param name="pageSize">每页大小。</param>
/// <param name="orderByExpression">排序表达式。</param>
/// <param name="isAsc">是否升序排序。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <param name="collection">集合名称。</param>
/// <returns>满足条件的文档列表。</returns>
public async FTask<List<T>> QueryByPageOrderBy<T>(Expression<Func<T, bool>> filter, int pageIndex, int pageSize, Expression<Func<T, object>> orderByExpression, bool isAsc = true, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
List<T> list;
if (isAsc)
{
list = await GetCollection<T>(collection).Find(filter).SortBy(orderByExpression).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
}
else
{
list = await GetCollection<T>(collection).Find(filter).SortByDescending(orderByExpression).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
}
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
/// <summary>
/// 通过指定过滤条件查询并返回满足条件的第一个文档(加锁)。
/// </summary>
/// <typeparam name="T">文档实体类型。</typeparam>
/// <param name="filter">查询过滤条件。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <param name="collection">集合名称。</param>
/// <returns>满足条件的第一个文档,如果未找到则为 null。</returns>
public async FTask<T?> First<T>(Expression<Func<T, bool>> filter, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var cursor = await GetCollection<T>(collection).FindAsync(filter);
var t = await cursor.FirstOrDefaultAsync();
if (isDeserialize && t != null)
{
t.Deserialize(_scene);
}
return t;
}
}
/// <summary>
/// 通过指定 JSON 格式查询并返回满足条件的第一个文档(加锁)。
/// </summary>
/// <typeparam name="T">文档实体类型。</typeparam>
/// <param name="json">JSON 查询条件。</param>
/// <param name="cols">要查询的列名称数组。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <param name="collection">集合名称。</param>
/// <returns>满足条件的第一个文档。</returns>
public async FTask<T> First<T>(string json, string[] cols, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var projection = Builders<T>.Projection.Include("");
foreach (var col in cols)
{
projection = projection.Include(col);
}
var options = new FindOptions<T, T> { Projection = projection };
FilterDefinition<T> filterDefinition = new JsonFilterDefinition<T>(json);
var cursor = await GetCollection<T>(collection).FindAsync(filterDefinition, options);
var t = await cursor.FirstOrDefaultAsync();
if (isDeserialize && t != null)
{
t.Deserialize(_scene);
}
return t;
}
}
/// <summary>
/// 通过指定过滤条件查询并返回满足条件的文档列表,并按指定表达式进行排序(加锁)。
/// </summary>
/// <typeparam name="T">文档实体类型。</typeparam>
/// <param name="filter">查询过滤条件。</param>
/// <param name="orderByExpression">排序表达式。</param>
/// <param name="isAsc">是否升序排序。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <param name="collection">集合名称。</param>
/// <returns>满足条件的文档列表。</returns>
public async FTask<List<T>> QueryOrderBy<T>(Expression<Func<T, bool>> filter, Expression<Func<T, object>> orderByExpression, bool isAsc = true, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
List<T> list;
if (isAsc)
{
list = await GetCollection<T>(collection).Find(filter).SortBy(orderByExpression).ToListAsync();
}
else
{
list = await GetCollection<T>(collection).Find(filter).SortByDescending(orderByExpression).ToListAsync();
}
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
/// <summary>
/// 通过指定过滤条件查询并返回满足条件的文档列表(加锁)。
/// </summary>
/// <typeparam name="T">文档实体类型。</typeparam>
/// <param name="filter">查询过滤条件。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <param name="collection">集合名称。</param>
/// <returns>满足条件的文档列表。</returns>
public async FTask<List<T>> Query<T>(Expression<Func<T, bool>> filter, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var cursor = await GetCollection<T>(collection).FindAsync(filter);
var list = await cursor.ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
/// <summary>
/// 根据指定 ID 加锁查询多个集合中的文档。
/// </summary>
/// <param name="id">文档 ID。</param>
/// <param name="collectionNames">要查询的集合名称列表。</param>
/// <param name="result">查询结果存储列表。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
public async FTask Query(long id, List<string>? collectionNames, List<Entity> result, bool isDeserialize = false)
{
using (await _dataBaseLock.Wait(id))
{
if (collectionNames == null || collectionNames.Count == 0)
{
return;
}
foreach (var collectionName in collectionNames)
{
var cursor = await GetCollection(collectionName).FindAsync(d => d.Id == id);
var e = await cursor.FirstOrDefaultAsync();
if (e == null)
{
continue;
}
if (isDeserialize)
{
e.Deserialize(_scene);
}
result.Add(e);
}
}
}
/// <summary>
/// 根据指定的 JSON 查询条件查询并返回满足条件的文档列表(加锁)。
/// </summary>
/// <typeparam name="T">文档实体类型。</typeparam>
/// <param name="json">JSON 查询条件。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <param name="collection">集合名称。</param>
/// <returns>满足条件的文档列表。</returns>
public async FTask<List<T>> QueryJson<T>(string json, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
FilterDefinition<T> filterDefinition = new JsonFilterDefinition<T>(json);
var cursor = await GetCollection<T>(collection).FindAsync(filterDefinition);
var list = await cursor.ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
/// <summary>
/// 根据指定的 JSON 查询条件查询并返回满足条件的文档列表,并选择指定的列(加锁)。
/// </summary>
/// <typeparam name="T">文档实体类型。</typeparam>
/// <param name="json">JSON 查询条件。</param>
/// <param name="cols">要查询的列名称数组。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <param name="collection">集合名称。</param>
/// <returns>满足条件的文档列表。</returns>
public async FTask<List<T>> QueryJson<T>(string json, string[] cols, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var projection = Builders<T>.Projection.Include("");
foreach (var col in cols)
{
projection = projection.Include(col);
}
var options = new FindOptions<T, T> { Projection = projection };
FilterDefinition<T> filterDefinition = new JsonFilterDefinition<T>(json);
var cursor = await GetCollection<T>(collection).FindAsync(filterDefinition, options);
var list = await cursor.ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
/// <summary>
/// 根据指定的 JSON 查询条件和任务 ID 查询并返回满足条件的文档列表(加锁)。
/// </summary>
/// <typeparam name="T">文档实体类型。</typeparam>
/// <param name="taskId">任务 ID。</param>
/// <param name="json">JSON 查询条件。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <param name="collection">集合名称。</param>
/// <returns>满足条件的文档列表。</returns>
public async FTask<List<T>> QueryJson<T>(long taskId, string json, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(taskId))
{
FilterDefinition<T> filterDefinition = new JsonFilterDefinition<T>(json);
var cursor = await GetCollection<T>(collection).FindAsync(filterDefinition);
var list = await cursor.ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
/// <summary>
/// 根据指定过滤条件查询并返回满足条件的文档列表,选择指定的列(加锁)。
/// </summary>
/// <typeparam name="T">文档实体类型。</typeparam>
/// <param name="filter">查询过滤条件。</param>
/// <param name="cols">要查询的列名称数组。</param>
/// <param name="isDeserialize">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <param name="collection">集合名称。</param>
/// <returns>满足条件的文档列表。</returns>
public async FTask<List<T>> Query<T>(Expression<Func<T, bool>> filter, string[] cols, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var projection = Builders<T>.Projection.Include("_id");
foreach (var t in cols)
{
projection = projection.Include(t);
}
var list = await GetCollection<T>(collection).Find(filter).Project<T>(projection).ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
/// <summary>
/// 根据指定过滤条件查询并返回满足条件的文档列表,选择指定的列(加锁)。
/// </summary>
/// <param name="filter">文档实体类型。</param>
/// <param name="cols">查询过滤条件。</param>
/// <param name="isDeserialize">要查询的列名称数组。</param>
/// <param name="collection">是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。</param>
/// <typeparam name="T">集合名称。</typeparam>
/// <returns></returns>
public async FTask<List<T>> Query<T>(Expression<Func<T, bool>> filter, Expression<Func<T, object>>[] cols, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var projection = Builders<T>.Projection.Include("_id");
foreach (var col in cols)
{
if (col.Body is not MemberExpression memberExpression)
{
throw new ArgumentException("Lambda expression must be a member access expression.");
}
projection = projection.Include(memberExpression.Member.Name);
}
var list = await GetCollection<T>(collection).Find(filter).Project<T>(projection).ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
#endregion
#region Save
/// <summary>
/// 保存实体对象到数据库(加锁)。
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="transactionSession">事务会话对象。</param>
/// <param name="entity">要保存的实体对象。</param>
/// <param name="collection">集合名称。</param>
public async FTask Save<T>(object transactionSession, T? entity, string collection = null) where T : Entity
{
if (entity == null)
{
Log.Error($"save entity is null: {typeof(T).Name}");
return;
}
var clone = _serializer.Clone(entity);
using (await _dataBaseLock.Wait(clone.Id))
{
await GetCollection<T>(collection).ReplaceOneAsync(
(IClientSessionHandle)transactionSession, d => d.Id == clone.Id, clone,
new ReplaceOptions { IsUpsert = true });
}
}
/// <summary>
/// 保存实体对象到数据库(加锁)。
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="entity">要保存的实体对象。</param>
/// <param name="collection">集合名称。</param>
public async FTask Save<T>(T? entity, string collection = null) where T : Entity, new()
{
if (entity == null)
{
Log.Error($"save entity is null: {typeof(T).Name}");
return;
}
var clone = _serializer.Clone(entity);
using (await _dataBaseLock.Wait(clone.Id))
{
await GetCollection<T>(collection).ReplaceOneAsync(d => d.Id == clone.Id, clone, new ReplaceOptions { IsUpsert = true });
}
}
/// <summary>
/// 保存实体对象到数据库(加锁)。
/// </summary>
/// <param name="filter">保存的条件表达式。</param>
/// <param name="entity">实体类型。</param>
/// <param name="collection">集合名称。</param>
/// <typeparam name="T"></typeparam>
public async FTask Save<T>(Expression<Func<T, bool>> filter, T? entity, string collection = null) where T : Entity, new()
{
if (entity == null)
{
Log.Error($"save entity is null: {typeof(T).Name}");
return;
}
T clone = _serializer.Clone(entity);
using (await _dataBaseLock.Wait(clone.Id))
{
await GetCollection<T>(collection).ReplaceOneAsync<T>(filter, clone, new ReplaceOptions { IsUpsert = true });
}
}
/// <summary>
/// 保存多个实体对象到数据库(加锁)。
/// </summary>
/// <param name="id">文档 ID。</param>
/// <param name="entities">要保存的实体对象列表。</param>
public async FTask Save(long id, List<Entity>? entities)
{
if (entities == null || entities.Count == 0)
{
Log.Error("save entity is null");
return;
}
using var listPool = ListPool<Entity>.Create();
foreach (var entity in entities)
{
listPool.Add(_serializer.Clone(entity));
}
using (await _dataBaseLock.Wait(id))
{
foreach (var clone in listPool)
{
try
{
await GetCollection(clone.GetType().Name).ReplaceOneAsync(d => d.Id == clone.Id, clone, new ReplaceOptions { IsUpsert = true });
}
catch (Exception e)
{
Log.Error($"Save List Entity Error: {clone.GetType().Name} {clone}\n{e}");
}
}
}
}
#endregion
#region Insert
/// <summary>
/// 插入单个实体对象到数据库(加锁)。
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="entity">要插入的实体对象。</param>
/// <param name="collection">集合名称。</param>
public async FTask Insert<T>(T? entity, string collection = null) where T : Entity, new()
{
if (entity == null)
{
Log.Error($"insert entity is null: {typeof(T).Name}");
return;
}
var clone = _serializer.Clone(entity);
using (await _dataBaseLock.Wait(entity.Id))
{
await GetCollection<T>(collection).InsertOneAsync(clone);
}
}
/// <summary>
/// 批量插入实体对象列表到数据库(加锁)。
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="list">要插入的实体对象列表。</param>
/// <param name="collection">集合名称。</param>
public async FTask InsertBatch<T>(IEnumerable<T> list, string collection = null) where T : Entity, new()
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
await GetCollection<T>(collection).InsertManyAsync(list);
}
}
/// <summary>
/// 批量插入实体对象列表到数据库(加锁)。
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="transactionSession">事务会话对象。</param>
/// <param name="list">要插入的实体对象列表。</param>
/// <param name="collection">集合名称。</param>
public async FTask InsertBatch<T>(object transactionSession, IEnumerable<T> list, string collection = null)
where T : Entity, new()
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
await GetCollection<T>(collection).InsertManyAsync((IClientSessionHandle)transactionSession, list);
}
}
/// <summary>
/// 插入BsonDocument到数据库加锁
/// </summary>
/// <param name="bsonDocument"></param>
/// <param name="taskId"></param>
/// <typeparam name="T"></typeparam>
public async Task Insert<T>(BsonDocument bsonDocument, long taskId) where T : Entity
{
using (await _dataBaseLock.Wait(taskId))
{
await GetCollection<BsonDocument>(typeof(T).Name).InsertOneAsync(bsonDocument);
}
}
#endregion
#region Remove
/// <summary>
/// 根据ID删除单个实体对象加锁
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="transactionSession">事务会话对象。</param>
/// <param name="id">要删除的实体的ID。</param>
/// <param name="collection">集合名称。</param>
/// <returns>删除的实体数量。</returns>
public async FTask<long> Remove<T>(object transactionSession, long id, string collection = null)
where T : Entity, new()
{
using (await _dataBaseLock.Wait(id))
{
var result = await GetCollection<T>(collection)
.DeleteOneAsync((IClientSessionHandle)transactionSession, d => d.Id == id);
return result.DeletedCount;
}
}
/// <summary>
/// 根据ID删除单个实体对象加锁
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="id">要删除的实体的ID。</param>
/// <param name="collection">集合名称。</param>
/// <returns>删除的实体数量。</returns>
public async FTask<long> Remove<T>(long id, string collection = null) where T : Entity, new()
{
using (await _dataBaseLock.Wait(id))
{
var result = await GetCollection<T>(collection).DeleteOneAsync(d => d.Id == id);
return result.DeletedCount;
}
}
/// <summary>
/// 根据ID和筛选条件删除多个实体对象加锁
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="coroutineLockQueueKey">异步锁Id。</param>
/// <param name="transactionSession">事务会话对象。</param>
/// <param name="filter">筛选条件。</param>
/// <param name="collection">集合名称。</param>
/// <returns>删除的实体数量。</returns>
public async FTask<long> Remove<T>(long coroutineLockQueueKey, object transactionSession,
Expression<Func<T, bool>> filter, string collection = null) where T : Entity, new()
{
using (await _dataBaseLock.Wait(coroutineLockQueueKey))
{
var result = await GetCollection<T>(collection)
.DeleteManyAsync((IClientSessionHandle)transactionSession, filter);
return result.DeletedCount;
}
}
/// <summary>
/// 根据ID和筛选条件删除多个实体对象加锁
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="coroutineLockQueueKey">异步锁Id。</param>
/// <param name="filter">筛选条件。</param>
/// <param name="collection">集合名称。</param>
/// <returns>删除的实体数量。</returns>
public async FTask<long> Remove<T>(long coroutineLockQueueKey, Expression<Func<T, bool>> filter,
string collection = null) where T : Entity, new()
{
using (await _dataBaseLock.Wait(coroutineLockQueueKey))
{
var result = await GetCollection<T>(collection).DeleteManyAsync(filter);
return result.DeletedCount;
}
}
#endregion
#region Index
/// <summary>
/// 创建数据库索引(加锁)。
/// </summary>
/// <param name="collection"></param>
/// <param name="keys"></param>
/// <typeparam name="T"></typeparam>
/// <code>
/// 使用例子(可多个):
/// 1 : Builders.IndexKeys.Ascending(d=>d.Id)
/// 2 : Builders.IndexKeys.Descending(d=>d.Id).Ascending(d=>d.Name)
/// 3 : Builders.IndexKeys.Descending(d=>d.Id),Builders.IndexKeys.Descending(d=>d.Name)
/// </code>
public async FTask CreateIndex<T>(string collection, params object[]? keys) where T : Entity
{
if (keys == null || keys.Length <= 0)
{
return;
}
var indexModels = new List<CreateIndexModel<T>>();
foreach (object key in keys)
{
IndexKeysDefinition<T> indexKeysDefinition = (IndexKeysDefinition<T>)key;
indexModels.Add(new CreateIndexModel<T>(indexKeysDefinition));
}
await GetCollection<T>(collection).Indexes.CreateManyAsync(indexModels);
}
/// <summary>
/// 创建数据库的索引(加锁)。
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
/// <param name="keys">索引键定义。</param>
public async FTask CreateIndex<T>(params object[]? keys) where T : Entity
{
if (keys == null)
{
return;
}
List<CreateIndexModel<T>> indexModels = new List<CreateIndexModel<T>>();
foreach (object key in keys)
{
IndexKeysDefinition<T> indexKeysDefinition = (IndexKeysDefinition<T>)key;
indexModels.Add(new CreateIndexModel<T>(indexKeysDefinition));
}
await GetCollection<T>().Indexes.CreateManyAsync(indexModels);
}
#endregion
#region CreateDB
/// <summary>
/// 创建数据库集合(如果不存在)。
/// </summary>
/// <typeparam name="T">实体类型。</typeparam>
public async FTask CreateDB<T>() where T : Entity
{
// 已经存在数据库表
string name = typeof(T).Name;
if (_collections.Contains(name))
{
return;
}
await _mongoDatabase.CreateCollectionAsync(name);
_collections.Add(name);
}
/// <summary>
/// 创建数据库集合(如果不存在)。
/// </summary>
/// <param name="type">实体类型。</param>
public async FTask CreateDB(Type type)
{
string name = type.Name;
if (_collections.Contains(name))
{
return;
}
await _mongoDatabase.CreateCollectionAsync(name);
_collections.Add(name);
}
#endregion
}
}
#endif