Команды и Обработчики команд

 

В предыдущей главе мы говорили об использовании событий как способа представления входных данных для нашей системы, и превратили наше приложение в машину обработки сообщений.

Для этого мы преобразовали все наши use-case функции в обработчики событий. Когда API получает POST для создания нового пакета (batch), он выстраивает новое событие 'BatchCreated' и обрабатывает его так, как если бы это было внутреннее событие. Это может показаться нелогичным. В конце концов, пакет еще не создан; вот почему мы вызвали API. Мы собираемся исправить эту концептуальную бородавку, введя команды и показав, как они могут обрабатываться одной и той же шиной сообщений, но с легка другими правилами.

Код для этой главы находится в chapter_10_commands branch on GitHub:

git clone https://github.com/cosmicpython/code.git
cd code
git checkout chapter_10_commands
# или, чтобы кодировать вместе, проверьте предыдущую главу:
git checkout chapter_09_all_messagebus

Commands и Events

Как и события, commands-это тип message—​instructions, посылаемые одной частью системы другой. Мы обычно представляем команды с тупыми структурами данных и можем обрабатывать их почти так же, как события.

Однако различия между командами(commands) и событиями(events) очень важны.

Команды посылаются одним актором другому конкретному актору в надежде, что в результате произойдет то или иное событие. Когда мы отправляем форму обработчику API, мы посылаем команду. Мы называем команды глаголами повелительного наклонения, такими как "выделить запас" (allocate stock) или "задержать отгрузку"(delay shipment).

Команды захватывают намерение. Они выражают наше желание, чтобы система что-то сделала. В результате, когда они потерпят неудачу, отправитель должен получить информацию об ошибке.

События транслируются актором всем заинтересованным слушателям (listeners). Когда мы публикуем 'BatchQuantityChanged', мы не знаем, кто его возьмет. Мы называем события глагольными фразами прошедшего времени, такими как "заказ распределен на складе"(order allocated to stock) или "отгрузка задержана" (shipment delayed).

Мы часто используем события для распространения знаний об успешных командах.

События фиксируют факты о том, что происходило в прошлом. Поскольку мы не знаем, кто обрабатывает событие, отправителей не должно волновать, удовлетворены ли получатели или нет. События против команд резюмирует различия.

Table 1. События против команд
СобытиеКоманда

Название

Прошедшее время

Повелительное наклонение

Обработка ошибок

Частная неудача

Шумный Сбой

Отправлено для

Всех слушателей

Единственного получателя

Какие команды есть у нас сейчас в нашей системе?

Example 1. Pulling out some commands (src/allocation/domain/commands.py)
class Command:
    pass

@dataclass
class Allocate(Command):  
    orderid: str
    sku: str
    qty: int

@dataclass
class CreateBatch(Command):  
    ref: str
    sku: str
    qty: int
    eta: Optional[date] = None

@dataclass
class ChangeBatchQuantity(Command):  
    ref: str
    qty: int
commands.Allocate заменит events.AllocationRequired.
commands.CreateBatch заменит events.BatchCreated.
commands.ChangeBatchQuantity заменит events.BatchQuantityChanged.

Различия в обработке исключений

Просто изменение имен и форм глаголов-это очень хорошо, но это не изменит поведение нашей системы. Мы хотим относиться к событиям и командам одинаково, но не совсем одинаково. Давайте посмотрим, как меняется наша шина сообщений:

Example 2. Вилка для отправки событий и команд (src/allocation/service_layer/messagebus.py)
Message = Union[commands.Command, events.Event]


def handle(message: Message, uow: unit_of_work.AbstractUnitOfWork):  
    results = []
    queue = [message]
    while queue:
        message = queue.pop(0)
        if isinstance(message, events.Event):
            handle_event(message, queue, uow)  
        elif isinstance(message, commands.Command):
            cmd_result = handle_command(message, queue, uow)  
            results.append(cmd_result)
        else:
            raise Exception(f'{message} was not an Event or Command')
    return results
У него все еще есть главная точка входа handle(), которая принимает message, который может быть командой или событием.
Мы отправляем события и команды двум различным вспомогательным функциям, показанным далее.

Вот как мы справляемся с событиями:

Example 3. События не могут прервать поток (src/allocation/service_layer/messagebus.py)
def handle_event(
    event: events.Event,
    queue: List[Message],
    uow: unit_of_work.AbstractUnitOfWork
):
    for handler in EVENT_HANDLERS[type(event)]:  
        try:
            logger.debug('обработка события %s с помощью обработчика %s', event, handler)
            handler(event, uow=uow)
            queue.extend(uow.collect_new_events())
        except Exception:
            logger.exception('Exception handling event %s', event)
            continue  
События передаются диспетчеру, который может делегировать их нескольким обработчикам на одно событие.
Он ловит и регистрирует ошибки, но не позволяет им прерывать обработку сообщений.

А вот команду мы делаем так:

Example 4. Commands reraise exceptions (src/allocation/service_layer/messagebus.py)
def handle_command(
    command: commands.Command,
    queue: List[Message],
    uow: unit_of_work.AbstractUnitOfWork
):
    logger.debug('handling command %s', command)
    try:
        handler = COMMAND_HANDLERS[type(command)]  
        result = handler(command, uow=uow)
        queue.extend(uow.collect_new_events())
        return result  
    except Exception:
        logger.exception('Exception handling command %s', command)
        raise  
Диспетчер команд ожидает только одного обработчика для каждой команды.
Если возникают какие-либо ошибки, они быстро терпят неудачу и будут пузыриться.
возвращаемый результат является только временным; как уже упоминалось в [temporary_ugly_hack], это временный хак, позволяющий шине сообщений возвращать пакетную ссылку для использования API. Мы исправим это в [chapter_12_cqrs].

Мы также меняем отдельные HANDLERS на словарь для разных команд и событий. Команды могут иметь только один обработчик, согласно нашему соглашению:

Example 5. Словари новых обработчиков (src/allocation/service_layer/messagebus.py)
EVENT_HANDLERS = {
    events.OutOfStock: [handlers.send_out_of_stock_notification],
}  # type: Dict[Type[events.Event], List[Callable]]

COMMAND_HANDLERS = {
    commands.Allocate: handlers.allocate,
    commands.CreateBatch: handlers.add_batch,
    commands.ChangeBatchQuantity: handlers.change_batch_quantity,
}  # type: Dict[Type[commands.Command], Callable]

Обсуждение: Events, Commands, и Error Handling

Многие разработчики испытывают дискомфорт в этот момент и спрашивают: "Что произойдёт, когда событие не получиться обработать? Как мне понять, что система находится в консистентном состоянии?" Если нам удастся обработать половину событий во время messagebus.handle прежде чем ошибка нехватки памяти убьет наш процесс, как мы можем сгладить проблемы, вызванные потерянными сообщениями?

Начнем с наихудшего случая: мы не справляемся с событием, и система остается в противоречивом состоянии. Какая ошибка могла бы вызвать это? Часто в наших системах мы можем оказаться в несогласованном состоянии, когда завершена только половина операции.

Например, представим, что мы выделили три единицы DESIRABLE_BEANBAG для какого то заказа клиента, но почему-то не смогли уменьшить количество оставшихся запасов. Это привело бы к несогласованному состоянию: три единицы запаса одновременно распределены и доступны, в зависимости от того, с какой стороны на это посмотреть. Позже мы могли бы выделить те же самые BEANBAG-и другому клиенту, что вызвало бы головную боль у службы поддержки клиентов.

Однако в нашей службе распределения мы уже предприняли шаги для предотвращения такой ситуации. Мы тщательно определили aggregates, которые действуют как границы согласованности, и ввели UoW, который управляет атомарным успехом или неудачей обновления агрегата.

For example, when we allocate stock to an order, our consistency boundary is the Product aggregate. This means that we can’t accidentally overallocate: either a particular order line is allocated to the product, or it is not—​there’s no room for inconsistent states. Например, когда мы распределяем запасы по заказу, нашей границей согласованности является агрегат Product. Это означает, что мы не можем случайно распределить: либо конкретная строка заказа выделяется продукту, либо нет — другого не дано, ибо нет места для несогласованных состояний.

По определению, мы не требуем, чтобы два агрегата были немедленно согласованы, поэтому, если мы не сможем обработать событие и обновить только один агрегат, наша система все равно может быть в конечном итоге согласована. Мы не должны нарушать никаких ограничений системы.

Разбирая этот пример, мы можем лучше понять причину разделения сообщений на команды и события. Когда пользователь хочет заставить систему что-то сделать, мы представляем его запрос как команду. Эта команда должна изменить один aggregate и либо преуспеть, либо потерпеть неудачу в целом. Любая другая бухгалтерия, очистка и уведомление, которые нам нужно сделать, могут происходить через event. Мы не требуем, чтобы обработчики событий были успешными, чтобы команда была успешной.

Давайте рассмотрим другой пример (из другого, воображаемого проекта), чтобы понять, почему нет.

Представьте себе, что мы создаем интернет-магазин, который продает дорогие предметы роскоши. Наш отдел маркетинга желает вознаграждать клиентов за повторные визиты. Мы будем отмечать клиентов как VIP-персон после того, как они совершат свою третью покупку, и это даст им право на приоритетное обслуживание и специальные предложения. Наши критерии принятия этой истории гласят следующее:

Используя методы, которые мы уже обсуждали в этой книге, мы решаем, что хотим создать новый агрегат History, который записывает заказы и может вызывать события домена при выполнении правил. Мы будем структурировать код следующим образом:

Example 6. VIP customer (example code for a different project)
Агрегат History фиксирует правила, указывающие, когда клиент становится VIP-персоной. Это особенно станет заметным, когда в будущем правила станут более сложными для внесения изменениий.
Наш первый хандлер создает заказ для клиента и вызывает доменное событие OrderCreated.
Второй обновляет объект History, чтобы записать, что заказ был created.
Наконец, мы отправляем электронное письмо клиенту, когда он становится VIP-персоной.

Используя этот код, мы можем получить некоторое представление об обработке ошибок в системе, управляемой событиями.

В нашей текущей реализации события агрегата, которые сохраняют наше состояние в базе данных, вызываем после. Что если бы мы подняли эти события, прежде чем сохранились, и зафиксировали все наши изменения одновременно? Таким образом, мы можем быть уверены, что вся работа завершена. Разве это не безопаснее?

Однако что происходит, если почтовый сервер немного перегружен? Если вся работа должна быть завершена одновременно, занятый почтовый сервер может помешать нам принимать оплату за заказы.

Что произойдет, если в реализации агрегата History есть ошибка? Неужели мы откажемся от ваших денег только потому, что не можем признать вас VIP-персоной?

Разделив эти пересекающиеся задачи, мы тем самым изолировали выходы из строя, что повышает общую надежность системы. Единственная часть этого кода, которую необходимо выполнить, - это обработчик команды, создания заказа. Это единственная часть, которая заботит покупателя, и это та часть, которую наши участники проекта должны расставить по приоритетам.

Обратите внимание, как мы намеренно выровняли наши транзакционные границы с началом и концом бизнес-процессов. Имена, которые мы используем в коде, соответствуют жаргону, используемому нашими заинтересованными сторонами в бизнесе, а обработчики, которые мы написали, соответствуют шагам наших критериев выбора естественного языка. Такое соответствие имен и структур помогает нам рассуждать о наших системах по мере того, как они становятся все больше и сложнее.

Синхронное восстановление после ошибок

Hopefully we’ve convinced you that it’s OK for events to fail independently from the commands that raised them. What should we do, then, to make sure we can recover from errors when they inevitably occur? Надеюсь, мы были достаточно убедительны показывая, что события могут завершиться неудачей независимо от команд, которые их вызвали. Что же нам тогда делать, чтобы убедиться, что мы можем оправиться от ошибок, когда они неизбежно произойдут?

Первое, что нам нужно, - это знать, когда произошла ошибка, и для этого мы обычно полагаемся на лог-файлы.

Давайте еще раз рассмотрим метод handle_event из нашей шины сообщений:

Example 7. Текущая функция обработчик (src/allocation/service_layer/messagebus.py)
def handle_event(
    event: events.Event,
    queue: List[Message],
    uow: unit_of_work.AbstractUnitOfWork
):
    for handler in EVENT_HANDLERS[type(event)]:
        try:
            logger.debug('handling event %s with handler %s', event, handler)
            handler(event, uow=uow)
            queue.extend(uow.collect_new_events())
        except Exception:
            logger.exception('Exception handling event %s', event)
            continue

Когда мы обрабатываем сообщение в нашей системе, первое, что мы делаем, - это записываем строку лога, для фиксации того, что мы собираемся сделать. Для нашего варианта использования CustomerBecameVIP журналы могут выглядеть следующим образом:

Handling event CustomerBecameVIP(customer_id=12345)
with handler <function congratulate_vip_customer at 0x10ebc9a60>

Because we’ve chosen to use dataclasses for our message types, we get a neatly printed summary of the incoming data that we can copy and paste into a Python shell to re-create the object. Поскольку мы решили использовать классы данных (dataclasses) для наших типов сообщений (message types), мы получаем аккуратно напечатанную сводку входящих данных, которую мы можем скопировать и вставить в оболочку Python для повторного создания объекта.

Когда возникает ошибка, мы можем использовать зарегистрированные данные, чтобы либо воспроизвести проблему в модульном тесте, либо воспроизвести сообщение в системе.

Ручной повтор хорошо работает в тех случаях, когда нам нужно исправить ошибку, прежде чем мы сможем повторно обработать событие, но наши системы всегда будут подвержены фоновым помехам переходного уровня. Это включает в себя такие вещи, как сбои сети, взаимоблокировки таблиц и кратковременные паузы, вызванные развертываниями.

В большинстве этих случаев мы можем элегантно восстановиться, попробовав еще раз. Как гласит пословица: "Если сначала у вас ничего не получится, повторите операцию с экспоненциально увеличивающимся периодом ожидания."

Example 8. Обработка с повтором (src/allocation/service_layer/messagebus.py)
Tenacity-это библиотека Python, которая реализует общие шаблоны для повторных попыток.
Здесь мы настраиваем нашу шину сообщений для повторения операций до трех раз с экспоненциально увеличивающейся паузой между попытками.

Повторные вызовы операций, которые могут потерпеть неудачу, - это, вероятно, единственный лучший способ повысить устойчивость нашего программного обеспечения. Опять же, шаблоны Unit of Work и Command Handler означают, что каждая попытка начинается с согласованного состояния и не оставит выполнение заданий наполовину законченными.

В какой-то момент, независимо от tenacity, нам придется отказаться от попыток обработать сообщение. Строить надежные системы с распределенными сообщениями непросто, и нам приходится пропускать некоторые сложные моменты. Поэтому, будет полезно изучить дополнительные справочные материалы в epilogue.

Подведение итогов

В этой книге мы решили представить концепцию событий до концепции команд, но другие руководства часто делают это наоборот. Сделать явными запросы, на которые наша система может ответить, дав им имя и их собственную структуру данных, - это довольно фундаментальная вещь. Иногда вы заметите, как используется шаблон Command Handler для описания того, что мы делаем с Events, Commands, и Message Bus.

Разделение команд и событий: компромиссы обсуждает некоторые моменты, о которых вам следует подумать, прежде чем запрыгивать на борт.

Table 2. Разделение команд и событий: компромиссы
ПлюсыМинусы
  • Различное отношение к командам и событиям помогает нам понять, какие вещи должны быть успешными, а какие мы можем привести в порядок позже.

  • CreateBatch безусловно, менее запутанное имя, чем BatchCreated. Мы явно выражаем намерения наших пользователей, а явное лучше, чем неявное, верно?

  • Семантические различия между командами и событиями могут быть незначительными. Ожидаемы споры по поводу различий.

  • Мы явно напрашиваемся на неудачу. Мы знаем, что иногда что-то ломается, и мы решаем справиться с этим, делая неудачи меньше и более изолированными. Это может затруднить работу системы и требует лучшего мониторинга.

В [chapter_11_external_events] мы поговорим об использовании событий в качестве шаблона интеграции.

Комментарии

Популярные сообщения из этого блога

4.Наш первый Use Case или пример использования: Flask API и Service Layer

Введение

2.Repository Pattern