OSS 事件 потока (OSS.Pipeline)
На основе управления бизнес-процессами с помощью BPM разработан облегчённый фреймворк для бизнес-процессов, который представляет собой базовую структуру для управления жизненным циклом и событиями в бизнес-домене.
В этом фреймворке функции управления процессами и событиями абстрагированы от объектов бизнес-домена, а также разорваны цепочки вызовов функций событий. Управление процессами централизовано в механизме процессов, а события обрабатываются как независимые единицы. Механизм процессов отвечает за запуск событий и передачу сообщений, обеспечивая изоляцию обработки событийных единиц.
Благодаря этому процессы становятся модулями, которые можно независимо программировать, и предоставляют возможность расширения бизнес-действий на верхнем уровне, гарантируя абсолютную независимость и возможность повторного использования бизнес-единиц. Цель состоит в том, чтобы обеспечить интеграцию различных функциональных кодов подобно сборке блоков, способствуя переходу к платформе с низким уровнем кода.
Если рассматривать весь бизнес-процесс как конвейер, то этот механизм процессов абстрагирует три основных компонента конвейера:
Компонент событийной активности: Этот компонент предназначен для обработки конкретных задач, таких как отправка SMS, оформление заказов, уменьшение запасов и другие реальные бизнес-операции.
Компонент шлюза: Этот компонент отвечает за логическую обработку направлений бизнес-процессов, включая ветвление и объединение процессов.
Компонент потока сообщений: Этот компонент занимается передачей и преобразованием сообщений между другими компонентами.
Компонент событийной активности
Этот компонент реализует конкретную логику бизнес-функций, включая связанные автоматические действия, прерывания по событиям (например, пользовательские триггеры или сообщения из очереди) и другие действия.
Основываясь на активном/пассивном характере ситуаций и учитывая отношения между текущим действием и контекстом следующего узла, предоставляются четыре базовых класса действий (всего семь):
/// <summary>
/// Конкретный метод расширения
/// </summary>
/// <returns>
```
protected abstract Task<TrafficSignal> Executing();
/// <summary>
/// Конкретный метод расширения
/// </summary>
/// <returns>
```
protected abstract Task<TrafficSignal<TRes>> Executing();
```
* BasePassiveActivity<TPara, TRes>, BasePassiveActivity<TPara, TRes,TOut> — пассивные компоненты действий, запускаемые по запросу. Когда поток достигает этого компонента, процесс останавливается, ожидая вызова метода Execute внешним узлом. После вызова поток продолжает движение к следующему узлу (параметр вызова становится новым контекстом). Эти классы наследуются и метод Executing переопределяется для реализации конкретных бизнес-логик.
```csharp
/// <summary>
```
/// Конкретный метод расширения
/// ```
/// <param name="para">Текущий контекст действия</param>
/// <returns>
```
protected abstract Task<TrafficSignal<TRes>> Executing(TPara para);
```
* BasePassiveEffectActivity<TPara, TRes> — пассивный компонент действий с влиянием на контекст, аналогичный активному компоненту действий с влиянием на контекст. Результат метода Executing становится контекстом для следующего узла.
Эти четыре класса действий включают метод Execute, который позволяет запускать действия непосредственно из любого узла. ### 4. BaseMsgSubscriber<TMsg> - 消息订阅者组件 (предоставляет реализацию по умолчанию: MsgSubscriber<TMsg>)
Ранее компонент предоставлял способ **подписки** на данные, источник триггера можно было легко изменить на очередь или базу данных (подробности см. в OSS.DataFlow [https://github.com/KevinWG/OSS.DataFlow]).
### Простой пример сценария
Предположим, что у нас есть сценарий управления запасами, который включает в себя несколько этапов: заявка на поставку, утверждение заявки, покупка и оплата, а также размещение на складе (с одновременным уведомлением заявителя по электронной почте). Каждый этап представляет собой событие-активность, например, заявка может быть определена следующим образом:
```csharp
public class ApplyActivity : BaseEffectActivity<ApplyContext, long>
{
public ApplyActivity()
{
PipeCode = "ApplyActivity";
}
protected override Task<TrafficSignal<long>> Executing(ApplyContext para)
{
LogHelper.Info($"发起 [{para.name}] 采购申请");
return Task.FromResult(new TrafficSignal<long>(100000001L));
}
}
Мы устанавливаем автоматическое выполнение заявки после утверждения, ожидая оплаты (пассивный тип). Мы определяем обработку для остальных этапов аналогичным образом, список выглядит следующим образом:
ApplyActivity - заявка (параметр: ApplyContext)
AutoAuditActivity - утверждение (параметр: long)
PayActivity - покупка (параметр: PayContext)
StockActivity - размещение (параметр: StockContext)
EmailActivity - отправка электронного письма (параметр: SendEmailContext)
Пять событий-активностей имеют независимые реализации и параметры. Поскольку покупка и оплата являются независимыми событиями, мы определяем разветвляющий шлюз для обработки разветвления (правило), код выглядит следующим образом:
public class PayGateway : BaseBranchGateway<PayContext>
{
public PayGateway()
{
PipeCode = "PayGateway";
}
protected override bool FilterBranchCondition(PayContext branchContext, IPipe branch, string prePipeCode)
{
LogHelper.Info($"通过{PipeCode} суждение о том, удовлетворяет ли ветвь {branch.PipeCode} условию разветвления!");
return base.FilterBranchCondition(branchContext, branch, prePipeCode);
}
}
Здесь смысл относительно прост: все входящие ветви не фильтруются, а просто передаются напрямую.
Поскольку методы и параметры пяти событий не полностью совпадают, мы добавляем соединитель сообщений для преобразования и передачи сообщений (также можно выразить обработку во время создания потока), используя оплату в качестве примера преобразования параметров в сообщение:
public class PayEmailConnector : BaseMsgConverter<PayContext, SendEmailContext>
{
public PayEmailConnector()
{
PipeCode = "PayEmailConnector";
}
protected override SendEmailContext Convert(PayContext inContextData)
{
// ......
return new SendEmailContext() { body = $" 您成功支付了订单,总额:{inContextData.money}" };
}
}
Таким образом, определение процесса закупки завершено, и компоненты связаны следующим образом (здесь это класс модульного тестирования, в реальной бизнес-среде мы можем создать службу для обработки):
[TestClass]
public class BuyFlowTests
{
public readonly ApplyActivity ApplyActivity = new ApplyActivity();
public readonly AutoAuditActivity AuditActivity = new AutoAuditActivity();
public readonly PayActivity PayActivity = new PayActivity();
public readonly PayGateway PayGateway = new PayGateway();
public readonly StockConnector StockConnector = new StockConnector();
public readonly StockActivity StockActivity = new StockActivity();
public readonly PayEmailConnector EmailConnector = new PayEmailConnector();
public readonly SendEmailActivity EmailActivity = new SendEmailActivity();
private EndGateway _endNode = new EndGateway();
// Конструктор определяет связь с потоком
public BuyFlowTests()
{
ApplyActivity
.Append(AuditActivity)
.Append(PayActivity)
.Append(PayGateway);
// Разветвляющий шлюз - ветка отправки электронной почты
PayGateway
.Append(EmailConnector)
.Append(EmailActivity)
.Append(_endNode);
// Разветвляющий шлюз — ветка размещения на складе
PayGateway
.Append(StockConnector)
.Append(StockActivity)
.Append(_endNode);
}
[TestMethod]
public async Task FlowTest()
{
await ApplyActivity.Execute(new ApplyContext()
{
name = "холодильник"
});
// Задержка на одну секунду, имитирующая операцию оплаты
await Task.Delay(1000);
await PayActivity.Execute(new PayContext()
{
count = 10,
money = 10000
});
await Task.Delay(1000);// Ожидание завершения асинхронного журнала
}
[TestMethod]
``` ```
public void RouteTest()
{
var TestPipeline = new Pipeline<ApplyContext, Empty>("test-flow", ApplyActivity, _endNode);
var route = TestPipeline.ToRoute();
Assert.IsTrue(route != null);
}
Запуск юнит-теста, результаты:
xxxxxx 17:46:15 — Detail: через ApplyActivity инициирована заявка на закупку из «холодильника».
xxxxxx 17:46:15 — Detail: заявка (номер 100000001) автоматически одобрена через AuditActivity.
xxxxxx 17:46:16 — Detail: выполнено действие PayActivity по оплате, количество: 10, сумма: 10 000.
xxxxxx 17:46:16 — Detail: с помощью PayGateway проверяется, удовлетворяет ли условие разветвления PayEmailConnector.
xxxxxx 17:46:16 — Detail: с помощью PayGateway проверяется, удовлетворяет ли условие разветвления StockConnector.
xxxxxx 17:46:16 — Detail: разветвление -1 (SendEmailActivity), отправлено письмо с содержанием: «Вы успешно оплатили заказ, общая сумма: 10 000».
xxxxxx 17:46:17 — Detail: после SendEmailActivity конвейер переходит к завершающему шлюзу.
xxxxxx 17:46:17 — Detail: разветвление -2 (StockActivity), увеличение запасов на 10 единиц.
xxxxxx 17:46:17 — Detail: после StockActivity конвейер переходит к завершающему шлюзу.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )