#if FANTASY_NET
using System.Linq.Expressions;
using Fantasy.Async;
using Fantasy.DataStructure.Collection;
using Fantasy.Entitas;
using Fantasy.Helper;
using Fantasy.Serialize;
using MongoDB.Bson;
using MongoDB.Driver;
#pragma warning disable CS8602 // Dereference of a possibly null reference.
#pragma warning disable CS8603 // Possible null reference return.
#pragma warning disable CS8625 // Cannot convert null literal to non-nullable reference type.
#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
namespace Fantasy.DataBase
{
///
/// 使用 MongoDB 数据库的实现。
///
public sealed class MongoDataBase : IDataBase
{
private const int DefaultTaskSize = 1024;
private Scene _scene;
private MongoClient _mongoClient;
private ISerialize _serializer;
private IMongoDatabase _mongoDatabase;
private CoroutineLock _dataBaseLock;
private readonly HashSet _collections = new HashSet();
///
/// 获得当前数据的类型
///
public DataBaseType GetDataBaseType { get; } = DataBaseType.MongoDB;
///
/// 获得对应数据的操作实例
///
public object GetDataBaseInstance => _mongoDatabase;
///
/// 初始化 MongoDB 数据库连接并记录所有集合名。
///
/// 场景对象。
/// 数据库连接字符串。
/// 数据库名称。
/// 初始化后的数据库实例。
public IDataBase Initialize(Scene scene, string connectionString, string dbName)
{
_scene = scene;
_mongoClient = DataBaseSetting.MongoDBCustomInitialize != null
? DataBaseSetting.MongoDBCustomInitialize(new DataBaseCustomConfig()
{
Scene = scene, ConnectionString = connectionString, DBName = dbName
})
: new MongoClient(connectionString);
_mongoDatabase = _mongoClient.GetDatabase(dbName);
_dataBaseLock = scene.CoroutineLockComponent.Create(GetType().TypeHandle.Value.ToInt64());
// 记录所有集合名
_collections.UnionWith(_mongoDatabase.ListCollectionNames().ToList());
_serializer = SerializerManager.GetSerializer(FantasySerializerType.Bson);
return this;
}
///
/// 销毁释放资源。
///
public void Dispose()
{
// 优先释放协程锁。
_dataBaseLock.Dispose();
// 清理资源。
_scene = null;
_serializer = null;
_mongoDatabase = null;
_dataBaseLock = null;
_collections.Clear();
_mongoClient.Dispose();
}
#region Other
///
/// 对满足条件的文档中的某个数值字段进行求和操作。
///
/// 实体类型。
/// 用于筛选文档的表达式。
/// 要对其进行求和的字段表达式。
/// 集合名称,可选。如果未指定,将使用实体类型的名称。
/// 满足条件的文档中指定字段的求和结果。
public async FTask Sum(Expression> filter, Expression> sumExpression, string collection = null) where T : Entity
{
var member = (MemberExpression)((UnaryExpression)sumExpression.Body).Operand;
var projection = new BsonDocument("_id", "null").Add("Result", new BsonDocument("$sum", $"${member.Member.Name}"));
var data = await GetCollection(collection).Aggregate().Match(filter).Group(projection).FirstOrDefaultAsync();
return data == null ? 0 : Convert.ToInt64(data["Result"]);
}
#endregion
#region GetCollection
///
/// 获取指定集合中的 MongoDB 文档的 IMongoCollection 对象。
///
/// 实体类型。
/// 集合名称,可选。如果未指定,将使用实体类型的名称。
/// IMongoCollection 对象。
private IMongoCollection GetCollection(string collection = null)
{
return _mongoDatabase.GetCollection(collection ?? typeof(T).Name);
}
///
/// 获取指定集合中的 MongoDB 文档的 IMongoCollection 对象,其中实体类型为 Entity。
///
/// 集合名称。
/// IMongoCollection 对象。
private IMongoCollection GetCollection(string name)
{
return _mongoDatabase.GetCollection(name);
}
#endregion
#region Count
///
/// 统计指定集合中满足条件的文档数量。
///
/// 实体类型。
/// 集合名称,可选。如果未指定,将使用实体类型的名称。
/// 满足条件的文档数量。
public async FTask Count(string collection = null) where T : Entity
{
return await GetCollection(collection).CountDocumentsAsync(d => true);
}
///
/// 统计指定集合中满足条件的文档数量。
///
/// 实体类型。
/// 用于筛选文档的表达式。
/// 集合名称,可选。如果未指定,将使用实体类型的名称。
/// 满足条件的文档数量。
public async FTask Count(Expression> filter, string collection = null) where T : Entity
{
return await GetCollection(collection).CountDocumentsAsync(filter);
}
#endregion
#region Exist
///
/// 判断指定集合中是否存在文档。
///
/// 实体类型。
/// 集合名称,可选。如果未指定,将使用实体类型的名称。
/// 如果存在文档则返回 true,否则返回 false。
public async FTask Exist(string collection = null) where T : Entity
{
return await Count(collection) > 0;
}
///
/// 判断指定集合中是否存在满足条件的文档。
///
/// 实体类型。
/// 用于筛选文档的表达式。
/// 集合名称,可选。如果未指定,将使用实体类型的名称。
/// 如果存在满足条件的文档则返回 true,否则返回 false。
public async FTask Exist(Expression> filter, string collection = null) where T : Entity
{
return await Count(filter, collection) > 0;
}
#endregion
#region Query
///
/// 在不加数据库锁定的情况下,查询指定 ID 的文档。
///
/// 文档实体类型。
/// 要查询的文档 ID。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
/// 查询到的文档。
public async FTask QueryNotLock(long id, bool isDeserialize = false, string collection = null) where T : Entity
{
var cursor = await GetCollection(collection).FindAsync(d => d.Id == id);
var v = await cursor.FirstOrDefaultAsync();
if (isDeserialize && v != null)
{
v.Deserialize(_scene);
}
return v;
}
///
/// 查询指定 ID 的文档,并加数据库锁定以确保数据一致性。
///
/// 文档实体类型。
/// 要查询的文档 ID。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
/// 查询到的文档。
public async FTask Query(long id, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(id))
{
var cursor = await GetCollection(collection).FindAsync(d => d.Id == id);
var v = await cursor.FirstOrDefaultAsync();
if (isDeserialize && v != null)
{
v.Deserialize(_scene);
}
return v;
}
}
///
/// 通过分页查询并返回满足条件的文档数量和日期列表(不加锁)。
///
/// 文档实体类型。
/// 查询过滤条件。
/// 页码。
/// 每页大小。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
/// 满足条件的文档数量和日期列表。
public async FTask<(int count, List dates)> QueryCountAndDatesByPage(Expression> filter, int pageIndex, int pageSize, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var count = await Count(filter);
var dates = await QueryByPage(filter, pageIndex, pageSize, isDeserialize, collection);
return ((int)count, dates);
}
}
///
/// 通过分页查询并返回满足条件的文档数量和日期列表(加锁)。
///
/// 文档实体类型。
/// 查询过滤条件。
/// 页码。
/// 每页大小。
/// 要查询的列名称数组。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
/// 满足条件的文档数量和日期列表。
public async FTask<(int count, List dates)> QueryCountAndDatesByPage(Expression> filter, int pageIndex, int pageSize, string[] cols, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var count = await Count(filter);
var dates = await QueryByPage(filter, pageIndex, pageSize, cols, isDeserialize, collection);
return ((int)count, dates);
}
}
///
/// 通过分页查询并返回满足条件的文档列表(不加锁)。
///
/// 文档实体类型。
/// 查询过滤条件。
/// 页码。
/// 每页大小。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
/// 满足条件的文档列表。
public async FTask> QueryByPage(Expression> filter, int pageIndex, int pageSize, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var list = await GetCollection(collection).Find(filter).Skip((pageIndex - 1) * pageSize)
.Limit(pageSize)
.ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
///
/// 通过分页查询并返回满足条件的文档列表(加锁)。
///
/// 文档实体类型。
/// 查询过滤条件。
/// 页码。
/// 每页大小。
/// 要查询的列名称数组。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
/// 满足条件的文档列表。
public async FTask> QueryByPage(Expression> filter, int pageIndex, int pageSize, string[] cols, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var projection = Builders.Projection.Include("");
foreach (var col in cols)
{
projection = projection.Include(col);
}
var list = await GetCollection(collection).Find(filter).Project(projection)
.Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
///
/// 通过分页查询并返回满足条件的文档列表,并按指定表达式进行排序(加锁)。
///
/// 文档实体类型。
/// 查询过滤条件。
/// 页码。
/// 每页大小。
/// 排序表达式。
/// 是否升序排序。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
/// 满足条件的文档列表。
public async FTask> QueryByPageOrderBy(Expression> filter, int pageIndex, int pageSize, Expression> orderByExpression, bool isAsc = true, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
List list;
if (isAsc)
{
list = await GetCollection(collection).Find(filter).SortBy(orderByExpression).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
}
else
{
list = await GetCollection(collection).Find(filter).SortByDescending(orderByExpression).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
}
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
///
/// 通过指定过滤条件查询并返回满足条件的第一个文档(加锁)。
///
/// 文档实体类型。
/// 查询过滤条件。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
/// 满足条件的第一个文档,如果未找到则为 null。
public async FTask First(Expression> filter, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var cursor = await GetCollection(collection).FindAsync(filter);
var t = await cursor.FirstOrDefaultAsync();
if (isDeserialize && t != null)
{
t.Deserialize(_scene);
}
return t;
}
}
///
/// 通过指定 JSON 格式查询并返回满足条件的第一个文档(加锁)。
///
/// 文档实体类型。
/// JSON 查询条件。
/// 要查询的列名称数组。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
/// 满足条件的第一个文档。
public async FTask First(string json, string[] cols, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var projection = Builders.Projection.Include("");
foreach (var col in cols)
{
projection = projection.Include(col);
}
var options = new FindOptions { Projection = projection };
FilterDefinition filterDefinition = new JsonFilterDefinition(json);
var cursor = await GetCollection(collection).FindAsync(filterDefinition, options);
var t = await cursor.FirstOrDefaultAsync();
if (isDeserialize && t != null)
{
t.Deserialize(_scene);
}
return t;
}
}
///
/// 通过指定过滤条件查询并返回满足条件的文档列表,并按指定表达式进行排序(加锁)。
///
/// 文档实体类型。
/// 查询过滤条件。
/// 排序表达式。
/// 是否升序排序。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
/// 满足条件的文档列表。
public async FTask> QueryOrderBy(Expression> filter, Expression> orderByExpression, bool isAsc = true, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
List list;
if (isAsc)
{
list = await GetCollection(collection).Find(filter).SortBy(orderByExpression).ToListAsync();
}
else
{
list = await GetCollection(collection).Find(filter).SortByDescending(orderByExpression).ToListAsync();
}
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
///
/// 通过指定过滤条件查询并返回满足条件的文档列表(加锁)。
///
/// 文档实体类型。
/// 查询过滤条件。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
/// 满足条件的文档列表。
public async FTask> Query(Expression> filter, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var cursor = await GetCollection(collection).FindAsync(filter);
var list = await cursor.ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
///
/// 根据指定 ID 加锁查询多个集合中的文档。
///
/// 文档 ID。
/// 要查询的集合名称列表。
/// 查询结果存储列表。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
public async FTask Query(long id, List? collectionNames, List result, bool isDeserialize = false)
{
using (await _dataBaseLock.Wait(id))
{
if (collectionNames == null || collectionNames.Count == 0)
{
return;
}
foreach (var collectionName in collectionNames)
{
var cursor = await GetCollection(collectionName).FindAsync(d => d.Id == id);
var e = await cursor.FirstOrDefaultAsync();
if (e == null)
{
continue;
}
if (isDeserialize)
{
e.Deserialize(_scene);
}
result.Add(e);
}
}
}
///
/// 根据指定的 JSON 查询条件查询并返回满足条件的文档列表(加锁)。
///
/// 文档实体类型。
/// JSON 查询条件。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
/// 满足条件的文档列表。
public async FTask> QueryJson(string json, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
FilterDefinition filterDefinition = new JsonFilterDefinition(json);
var cursor = await GetCollection(collection).FindAsync(filterDefinition);
var list = await cursor.ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
///
/// 根据指定的 JSON 查询条件查询并返回满足条件的文档列表,并选择指定的列(加锁)。
///
/// 文档实体类型。
/// JSON 查询条件。
/// 要查询的列名称数组。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
/// 满足条件的文档列表。
public async FTask> QueryJson(string json, string[] cols, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var projection = Builders.Projection.Include("");
foreach (var col in cols)
{
projection = projection.Include(col);
}
var options = new FindOptions { Projection = projection };
FilterDefinition filterDefinition = new JsonFilterDefinition(json);
var cursor = await GetCollection(collection).FindAsync(filterDefinition, options);
var list = await cursor.ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
///
/// 根据指定的 JSON 查询条件和任务 ID 查询并返回满足条件的文档列表(加锁)。
///
/// 文档实体类型。
/// 任务 ID。
/// JSON 查询条件。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
/// 满足条件的文档列表。
public async FTask> QueryJson(long taskId, string json, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(taskId))
{
FilterDefinition filterDefinition = new JsonFilterDefinition(json);
var cursor = await GetCollection(collection).FindAsync(filterDefinition);
var list = await cursor.ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
///
/// 根据指定过滤条件查询并返回满足条件的文档列表,选择指定的列(加锁)。
///
/// 文档实体类型。
/// 查询过滤条件。
/// 要查询的列名称数组。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
/// 满足条件的文档列表。
public async FTask> Query(Expression> filter, string[] cols, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var projection = Builders.Projection.Include("_id");
foreach (var t in cols)
{
projection = projection.Include(t);
}
var list = await GetCollection(collection).Find(filter).Project(projection).ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
///
/// 根据指定过滤条件查询并返回满足条件的文档列表,选择指定的列(加锁)。
///
/// 文档实体类型。
/// 查询过滤条件。
/// 要查询的列名称数组。
/// 是否在查询后反序列化,执行反序列化后会自动将实体注册到框架系统中,并且能正常使用组件相关功能。
/// 集合名称。
///
public async FTask> Query(Expression> filter, Expression>[] cols, bool isDeserialize = false, string collection = null) where T : Entity
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
var projection = Builders.Projection.Include("_id");
foreach (var col in cols)
{
if (col.Body is not MemberExpression memberExpression)
{
throw new ArgumentException("Lambda expression must be a member access expression.");
}
projection = projection.Include(memberExpression.Member.Name);
}
var list = await GetCollection(collection).Find(filter).Project(projection).ToListAsync();
if (!isDeserialize || list is not { Count: > 0 })
{
return list;
}
foreach (var entity in list)
{
entity.Deserialize(_scene);
}
return list;
}
}
#endregion
#region Save
///
/// 保存实体对象到数据库(加锁)。
///
/// 实体类型。
/// 事务会话对象。
/// 要保存的实体对象。
/// 集合名称。
public async FTask Save(object transactionSession, T? entity, string collection = null) where T : Entity
{
if (entity == null)
{
Log.Error($"save entity is null: {typeof(T).Name}");
return;
}
var clone = _serializer.Clone(entity);
using (await _dataBaseLock.Wait(clone.Id))
{
await GetCollection(collection).ReplaceOneAsync(
(IClientSessionHandle)transactionSession, d => d.Id == clone.Id, clone,
new ReplaceOptions { IsUpsert = true });
}
}
///
/// 保存实体对象到数据库(加锁)。
///
/// 实体类型。
/// 要保存的实体对象。
/// 集合名称。
public async FTask Save(T? entity, string collection = null) where T : Entity, new()
{
if (entity == null)
{
Log.Error($"save entity is null: {typeof(T).Name}");
return;
}
var clone = _serializer.Clone(entity);
using (await _dataBaseLock.Wait(clone.Id))
{
await GetCollection(collection).ReplaceOneAsync(d => d.Id == clone.Id, clone, new ReplaceOptions { IsUpsert = true });
}
}
///
/// 保存实体对象到数据库(加锁)。
///
/// 保存的条件表达式。
/// 实体类型。
/// 集合名称。
///
public async FTask Save(Expression> filter, T? entity, string collection = null) where T : Entity, new()
{
if (entity == null)
{
Log.Error($"save entity is null: {typeof(T).Name}");
return;
}
T clone = _serializer.Clone(entity);
using (await _dataBaseLock.Wait(clone.Id))
{
await GetCollection(collection).ReplaceOneAsync(filter, clone, new ReplaceOptions { IsUpsert = true });
}
}
///
/// 保存多个实体对象到数据库(加锁)。
///
/// 文档 ID。
/// 要保存的实体对象列表。
public async FTask Save(long id, List? entities)
{
if (entities == null || entities.Count == 0)
{
Log.Error("save entity is null");
return;
}
using var listPool = ListPool.Create();
foreach (var entity in entities)
{
listPool.Add(_serializer.Clone(entity));
}
using (await _dataBaseLock.Wait(id))
{
foreach (var clone in listPool)
{
try
{
await GetCollection(clone.GetType().Name).ReplaceOneAsync(d => d.Id == clone.Id, clone, new ReplaceOptions { IsUpsert = true });
}
catch (Exception e)
{
Log.Error($"Save List Entity Error: {clone.GetType().Name} {clone}\n{e}");
}
}
}
}
#endregion
#region Insert
///
/// 插入单个实体对象到数据库(加锁)。
///
/// 实体类型。
/// 要插入的实体对象。
/// 集合名称。
public async FTask Insert(T? entity, string collection = null) where T : Entity, new()
{
if (entity == null)
{
Log.Error($"insert entity is null: {typeof(T).Name}");
return;
}
var clone = _serializer.Clone(entity);
using (await _dataBaseLock.Wait(entity.Id))
{
await GetCollection(collection).InsertOneAsync(clone);
}
}
///
/// 批量插入实体对象列表到数据库(加锁)。
///
/// 实体类型。
/// 要插入的实体对象列表。
/// 集合名称。
public async FTask InsertBatch(IEnumerable list, string collection = null) where T : Entity, new()
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
await GetCollection(collection).InsertManyAsync(list);
}
}
///
/// 批量插入实体对象列表到数据库(加锁)。
///
/// 实体类型。
/// 事务会话对象。
/// 要插入的实体对象列表。
/// 集合名称。
public async FTask InsertBatch(object transactionSession, IEnumerable list, string collection = null)
where T : Entity, new()
{
using (await _dataBaseLock.Wait(RandomHelper.RandInt64() % DefaultTaskSize))
{
await GetCollection(collection).InsertManyAsync((IClientSessionHandle)transactionSession, list);
}
}
///
/// 插入BsonDocument到数据库(加锁)。
///
///
///
///
public async Task Insert(BsonDocument bsonDocument, long taskId) where T : Entity
{
using (await _dataBaseLock.Wait(taskId))
{
await GetCollection(typeof(T).Name).InsertOneAsync(bsonDocument);
}
}
#endregion
#region Remove
///
/// 根据ID删除单个实体对象(加锁)。
///
/// 实体类型。
/// 事务会话对象。
/// 要删除的实体的ID。
/// 集合名称。
/// 删除的实体数量。
public async FTask Remove(object transactionSession, long id, string collection = null)
where T : Entity, new()
{
using (await _dataBaseLock.Wait(id))
{
var result = await GetCollection(collection)
.DeleteOneAsync((IClientSessionHandle)transactionSession, d => d.Id == id);
return result.DeletedCount;
}
}
///
/// 根据ID删除单个实体对象(加锁)。
///
/// 实体类型。
/// 要删除的实体的ID。
/// 集合名称。
/// 删除的实体数量。
public async FTask Remove(long id, string collection = null) where T : Entity, new()
{
using (await _dataBaseLock.Wait(id))
{
var result = await GetCollection(collection).DeleteOneAsync(d => d.Id == id);
return result.DeletedCount;
}
}
///
/// 根据ID和筛选条件删除多个实体对象(加锁)。
///
/// 实体类型。
/// 异步锁Id。
/// 事务会话对象。
/// 筛选条件。
/// 集合名称。
/// 删除的实体数量。
public async FTask Remove(long coroutineLockQueueKey, object transactionSession,
Expression> filter, string collection = null) where T : Entity, new()
{
using (await _dataBaseLock.Wait(coroutineLockQueueKey))
{
var result = await GetCollection(collection)
.DeleteManyAsync((IClientSessionHandle)transactionSession, filter);
return result.DeletedCount;
}
}
///
/// 根据ID和筛选条件删除多个实体对象(加锁)。
///
/// 实体类型。
/// 异步锁Id。
/// 筛选条件。
/// 集合名称。
/// 删除的实体数量。
public async FTask Remove(long coroutineLockQueueKey, Expression> filter,
string collection = null) where T : Entity, new()
{
using (await _dataBaseLock.Wait(coroutineLockQueueKey))
{
var result = await GetCollection(collection).DeleteManyAsync(filter);
return result.DeletedCount;
}
}
#endregion
#region Index
///
/// 创建数据库索引(加锁)。
///
///
///
///
///
/// 使用例子(可多个):
/// 1 : Builders.IndexKeys.Ascending(d=>d.Id)
/// 2 : Builders.IndexKeys.Descending(d=>d.Id).Ascending(d=>d.Name)
/// 3 : Builders.IndexKeys.Descending(d=>d.Id),Builders.IndexKeys.Descending(d=>d.Name)
///
public async FTask CreateIndex(string collection, params object[]? keys) where T : Entity
{
if (keys == null || keys.Length <= 0)
{
return;
}
var indexModels = new List>();
foreach (object key in keys)
{
IndexKeysDefinition indexKeysDefinition = (IndexKeysDefinition)key;
indexModels.Add(new CreateIndexModel(indexKeysDefinition));
}
await GetCollection(collection).Indexes.CreateManyAsync(indexModels);
}
///
/// 创建数据库的索引(加锁)。
///
/// 实体类型。
/// 索引键定义。
public async FTask CreateIndex(params object[]? keys) where T : Entity
{
if (keys == null)
{
return;
}
List> indexModels = new List>();
foreach (object key in keys)
{
IndexKeysDefinition indexKeysDefinition = (IndexKeysDefinition)key;
indexModels.Add(new CreateIndexModel(indexKeysDefinition));
}
await GetCollection().Indexes.CreateManyAsync(indexModels);
}
#endregion
#region CreateDB
///
/// 创建数据库集合(如果不存在)。
///
/// 实体类型。
public async FTask CreateDB() where T : Entity
{
// 已经存在数据库表
string name = typeof(T).Name;
if (_collections.Contains(name))
{
return;
}
await _mongoDatabase.CreateCollectionAsync(name);
_collections.Add(name);
}
///
/// 创建数据库集合(如果不存在)。
///
/// 实体类型。
public async FTask CreateDB(Type type)
{
string name = type.Name;
if (_collections.Contains(name))
{
return;
}
await _mongoDatabase.CreateCollectionAsync(name);
_collections.Add(name);
}
#endregion
}
}
#endif