1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/osscore-OSS.PipeLine

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Это зеркальный репозиторий, синхронизируется ежедневно с исходного репозитория.
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

OSS 事件 потока (OSS.Pipeline)

На основе управления бизнес-процессами с помощью BPM разработан облегчённый фреймворк для бизнес-процессов, который представляет собой базовую структуру для управления жизненным циклом и событиями в бизнес-домене.

В этом фреймворке функции управления процессами и событиями абстрагированы от объектов бизнес-домена, а также разорваны цепочки вызовов функций событий. Управление процессами централизовано в механизме процессов, а события обрабатываются как независимые единицы. Механизм процессов отвечает за запуск событий и передачу сообщений, обеспечивая изоляцию обработки событийных единиц.

Благодаря этому процессы становятся модулями, которые можно независимо программировать, и предоставляют возможность расширения бизнес-действий на верхнем уровне, гарантируя абсолютную независимость и возможность повторного использования бизнес-единиц. Цель состоит в том, чтобы обеспечить интеграцию различных функциональных кодов подобно сборке блоков, способствуя переходу к платформе с низким уровнем кода.

Если рассматривать весь бизнес-процесс как конвейер, то этот механизм процессов абстрагирует три основных компонента конвейера:

  1. Компонент событийной активности: Этот компонент предназначен для обработки конкретных задач, таких как отправка SMS, оформление заказов, уменьшение запасов и другие реальные бизнес-операции.

  2. Компонент шлюза: Этот компонент отвечает за логическую обработку направлений бизнес-процессов, включая ветвление и объединение процессов.

  3. Компонент потока сообщений: Этот компонент занимается передачей и преобразованием сообщений между другими компонентами.

Компонент событийной активности

Этот компонент реализует конкретную логику бизнес-функций, включая связанные автоматические действия, прерывания по событиям (например, пользовательские триггеры или сообщения из очереди) и другие действия.

Основываясь на активном/пассивном характере ситуаций и учитывая отношения между текущим действием и контекстом следующего узла, предоставляются четыре базовых класса действий (всего семь):

  • BaseActivity, BaseActivity, BaseActivity<TIn, TRes>, BaseActivity<TIn, TRes,TOut> — активные триггерные компоненты действий. Они часто используются для автоматического аудита или отправки электронных писем после успешной оплаты. Эти классы наследуются, и метод Executing переопределяется для реализации действий. Один и тот же поток автоматически связывает выполнение действий, и после завершения выполнения автоматически запускается следующий узел (с текущим контекстом).
    /// <summary>
    /// Конкретный метод расширения
    /// </summary>
    /// <returns>
    ```
    protected abstract Task<TrafficSignal> Executing();
  • BaseEffectActivity, BaseEffectActivity<TIn, TRes> — активные компоненты действий с влиянием на контекст. По умолчанию, когда обработка действия завершена, текущий контекст действия автоматически передаётся следующему узлу конвейера. Однако в некоторых случаях следующий узел может просто нуждаться в результате предыдущего действия, например, подтверждение заказа, требующее знания идентификатора заказа. В таких ситуациях используются эти классы, где результат метода Executing становится контекстом для следующего действия.
    /// <summary>
    ///  Конкретный метод расширения
    /// </summary>
    /// <returns>
    ```
        protected abstract Task<TrafficSignal<TRes>> Executing();
    ```

* BasePassiveActivity<TPara, TRes>, BasePassiveActivity<TPara, TRes,TOut>  пассивные компоненты действий, запускаемые по запросу. Когда поток достигает этого компонента, процесс останавливается, ожидая вызова метода Execute внешним узлом. После вызова поток продолжает движение к следующему узлу (параметр вызова становится новым контекстом). Эти классы наследуются и метод Executing переопределяется для реализации конкретных бизнес-логик.

```csharp
    /// <summary>
    ```
    /// Конкретный метод расширения
    /// ```
    /// <param name="para">Текущий контекст действия</param>
    /// <returns>
    ```
    protected abstract Task<TrafficSignal<TRes>> Executing(TPara para);
    ```

* BasePassiveEffectActivity<TPara, TRes>  пассивный компонент действий с влиянием на контекст, аналогичный активному компоненту действий с влиянием на контекст. Результат метода Executing становится контекстом для следующего узла.

Эти четыре класса действий включают метод Execute, который позволяет запускать действия непосредственно из любого узла. ### 4. BaseMsgSubscriber<TMsg> - 消息订阅者组件 (предоставляет реализацию по умолчанию: MsgSubscriber<TMsg>)

Ранее компонент предоставлял способ **подписки** на данные, источник триггера можно было легко изменить на очередь или базу данных (подробности см. в OSS.DataFlow [https://github.com/KevinWG/OSS.DataFlow]).

### Простой пример сценария

Предположим, что у нас есть сценарий управления запасами, который включает в себя несколько этапов: заявка на поставку, утверждение заявки, покупка и оплата, а также размещение на складе (с одновременным уведомлением заявителя по электронной почте). Каждый этап представляет собой событие-активность, например, заявка может быть определена следующим образом:
```csharp
    public class ApplyActivity : BaseEffectActivity<ApplyContext, long>
    {
        public ApplyActivity()
        {
            PipeCode = "ApplyActivity";
        }
        
        protected override Task<TrafficSignal<long>> Executing(ApplyContext para)
        {
            LogHelper.Info($"发起 [{para.name}] 采购申请");
            return Task.FromResult(new TrafficSignal<long>(100000001L));
        }
    }

Мы устанавливаем автоматическое выполнение заявки после утверждения, ожидая оплаты (пассивный тип). Мы определяем обработку для остальных этапов аналогичным образом, список выглядит следующим образом:

    ApplyActivity      - заявка (параметр: ApplyContext)
    AutoAuditActivity  - утверждение (параметр: long)
    PayActivity        - покупка (параметр: PayContext)
    StockActivity      - размещение (параметр: StockContext)
    EmailActivity      - отправка электронного письма (параметр: SendEmailContext)

Пять событий-активностей имеют независимые реализации и параметры. Поскольку покупка и оплата являются независимыми событиями, мы определяем разветвляющий шлюз для обработки разветвления (правило), код выглядит следующим образом:

    public class PayGateway : BaseBranchGateway<PayContext>
    {
        public PayGateway()
        {
            PipeCode = "PayGateway";
        }

        protected override bool FilterBranchCondition(PayContext branchContext, IPipe branch, string prePipeCode)
        {
            LogHelper.Info($"通过{PipeCode}  суждение о том, удовлетворяет ли ветвь {branch.PipeCode} условию разветвления!");
            return base.FilterBranchCondition(branchContext, branch, prePipeCode);
        }
    }

Здесь смысл относительно прост: все входящие ветви не фильтруются, а просто передаются напрямую.

Поскольку методы и параметры пяти событий не полностью совпадают, мы добавляем соединитель сообщений для преобразования и передачи сообщений (также можно выразить обработку во время создания потока), используя оплату в качестве примера преобразования параметров в сообщение:

    public class PayEmailConnector : BaseMsgConverter<PayContext, SendEmailContext>
    {
        public PayEmailConnector()
        {
            PipeCode = "PayEmailConnector";
        }
        protected override SendEmailContext Convert(PayContext inContextData)
        {
            // ......
            return new SendEmailContext() { body = $" 您成功支付了订单,总额:{inContextData.money}" };
        }
    }

Таким образом, определение процесса закупки завершено, и компоненты связаны следующим образом (здесь это класс модульного тестирования, в реальной бизнес-среде мы можем создать службу для обработки):

    [TestClass]
    public class BuyFlowTests
    {
        public readonly ApplyActivity     ApplyActivity = new ApplyActivity();
        public readonly AutoAuditActivity AuditActivity = new AutoAuditActivity();

        public readonly PayActivity PayActivity = new PayActivity();

        public readonly PayGateway PayGateway = new PayGateway();

        public readonly StockConnector StockConnector = new StockConnector();
        public readonly StockActivity  StockActivity  = new StockActivity();

        public readonly PayEmailConnector EmailConnector = new PayEmailConnector();
        public readonly SendEmailActivity EmailActivity  = new SendEmailActivity();



        private EndGateway _endNode = new EndGateway();

        //  Конструктор определяет связь с потоком
        public BuyFlowTests()
        {


            ApplyActivity
            .Append(AuditActivity)

            .Append(PayActivity)
            .Append(PayGateway);

            // Разветвляющий шлюз - ветка отправки электронной почты
            PayGateway
            .Append(EmailConnector)
            .Append(EmailActivity)
            .Append(_endNode);

            // Разветвляющий шлюз — ветка размещения на складе
            PayGateway
            .Append(StockConnector)
            .Append(StockActivity)
            .Append(_endNode);


        }

        [TestMethod]
        public async Task FlowTest()
        {
            await ApplyActivity.Execute(new ApplyContext()
            {
                name = "холодильник"
            });

            // Задержка на одну секунду, имитирующая операцию оплаты
            await Task.Delay(1000);

            await PayActivity.Execute(new PayContext()
            {
                count = 10,
                money = 10000
            });
            await Task.Delay(1000);// Ожидание завершения асинхронного журнала
        }

        [TestMethod]
``` ```
public void RouteTest()
{
    var TestPipeline = new Pipeline<ApplyContext, Empty>("test-flow", ApplyActivity, _endNode);

    var route = TestPipeline.ToRoute();
    Assert.IsTrue(route != null);
}

Запуск юнит-теста, результаты:

  • xxxxxx 17:46:15Detail: через ApplyActivity инициирована заявка на закупку из «холодильника».

  • xxxxxx 17:46:15Detail: заявка (номер 100000001) автоматически одобрена через AuditActivity.

  • xxxxxx 17:46:16Detail: выполнено действие PayActivity по оплате, количество: 10, сумма: 10 000.

  • xxxxxx 17:46:16Detail: с помощью PayGateway проверяется, удовлетворяет ли условие разветвления PayEmailConnector.

  • xxxxxx 17:46:16Detail: с помощью PayGateway проверяется, удовлетворяет ли условие разветвления StockConnector.

  • xxxxxx 17:46:16Detail: разветвление -1 (SendEmailActivity), отправлено письмо с содержанием: «Вы успешно оплатили заказ, общая сумма: 10 000».

  • xxxxxx 17:46:17Detail: после SendEmailActivity конвейер переходит к завершающему шлюзу.

  • xxxxxx 17:46:17Detail: разветвление -2 (StockActivity), увеличение запасов на 10 единиц.

  • xxxxxx 17:46:17Detail: после StockActivity конвейер переходит к завершающему шлюзу.

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Описание недоступно Расширить Свернуть
GPL-3.0
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://gitlife.ru/oschina-mirror/osscore-OSS.PipeLine.git
git@gitlife.ru:oschina-mirror/osscore-OSS.PipeLine.git
oschina-mirror
osscore-OSS.PipeLine
osscore-OSS.PipeLine
master