Расширение типов узлов

Тип узла по сути является операционной инструкцией. Разные инструкции представляют разные операции, выполняемые в рабочем процессе.

Как и в случае с триггером, расширение типов узлов также делится на две части: серверную и клиентскую. На сервере нужно реализовать логику зарегистрированной инструкции, а на клиенте — предоставить интерфейсную настройку параметров узла, в котором используется эта инструкция.

Серверная часть

Самая простая инструкция узла

Ключевое содержимое инструкции — функция, то есть для выполнения логики инструкции необходимо реализовать метод run в классе инструкции. Внутри функции можно выполнять любые необходимые операции: с базой данных, файлами, сторонними API и т. д.

Все инструкции должны наследоваться от базового класса Instruction. Самая простая инструкция требует только реализации функции run:

import { Instruction, JOB_STATUS } from '@nocobase/plugin-workflow';

export class MyInstruction extends Instruction {
  run(node, input, processor) {
    console.log('my instruction runs!');
    return {
      status: JOB_STATUS.RESOVLED,
    };
  }
}

Затем зарегистрируйте эту инструкцию в плагине рабочего процесса:

export default class MyPlugin extends Plugin {
  load() {
    // получение экземпляра плагина рабочего процесса
    const workflowPlugin = this.app.getPlugin<WorkflowPlugin>(WorkflowPlugin);

    // регистрация инструкции
    workflowPlugin.registerInstruction('my-instruction', MyInstruction);
  }
}

Значение статуса (status) в возвращаемом объекте инструкции обязательно и должно быть одним из значений константы JOB_STATUS. Это значение определяет дальнейший ход обработки для данного узла в рабочем процессе. Обычно используется JOB_STATUS.RESOVLED, что означает успешное выполнение узла и продолжение выполнения следующих узлов. Если есть результат, который нужно заранее сохранить, можно вызвать processor.saveJob и вернуть его возвращаемый объект. Исполнитель создаст запись результата выполнения на его основе.

Значение результата узла

Если есть конкретный результат выполнения, особенно данные для последующих узлов, его можно вернуть через свойство result и сохранить в объекте задачи узла:

import { Instruction, JOB_STATUS } from '@nocobase/plugin-workflow';

export class RandomStringInstruction extends Instruction {
  run(node, input, processor) {
    // пользовательская конфигурация из узла
    const { digit = 1 } = node.config;
    const result = `${Math.round(10 ** digit * Math.random())}`.padStart(
      digit,
      '0',
    );
    return {
      status: JOB_STATUS.RESOVLED,
      result,
    };
  },
};

Здесь node.config — это конфигурация узла, в которой могут быть любые необходимые значения. Она сохраняется как поле типа JSON в соответствующей записи узла в базе данных.

Обработка ошибок инструкции

Если при выполнении возможны исключения, их можно перехватить заранее и вернуть статус ошибки:

import { JOB_STATUS } from '@nocobase/plugin-workflow';

export const errorInstruction = {
  run(node, input, processor) {
    try {
      throw new Error('exception');
    } catch (error) {
      return {
        status: JOB_STATUS.ERROR,
        result: error,
      };
    }
  },
};

Если предсказуемые исключения не перехватываются, движок рабочего процесса автоматически перехватит их и вернёт статус ошибки, чтобы необработанные исключения не приводили к падению программы.

Асинхронные узлы

Если узлу нужно дождаться завершения внешней операции перед продолжением рабочего процесса, например HTTP-запроса, колбэка от платёжной системы или другой длительной операции, задачу сначала нужно сохранить со статусом JOB_STATUS.PENDING. Это приостанавливает текущее выполнение. После завершения внешней операции выполнение возобновляется через метод resume.Любой тип узла, использующий механизм приостановки выполнения, должен также реализовывать метод resume. Без него рабочий процесс не сможет продолжить выполнение.

Рекомендуемая схема реализации:

import { Instruction, JOB_STATUS, FlowNodeModel, IJob } from '@nocobase/plugin-workflow';

export class AsyncInstruction extends Instruction {
  async run(node: FlowNodeModel, prevJob, processor) {
    // 1. Сохраняем задачу в статусе ожидания и запоминаем её id
    const { id } = processor.saveJob({
      status: JOB_STATUS.PENDING,
      nodeId: node.id,
      nodeKey: node.key,
      upstreamId: prevJob?.id ?? null,
    });

    // 2. Явно вызываем exit(), чтобы записать задачу в БД и зафиксировать транзакцию
    await processor.exit();

    // 3. Запускаем асинхронную операцию (транзакция уже зафиксирована, соединение с БД не удерживается)
    const jobDone: IJob = { status: JOB_STATUS.PENDING };
    try {
      const result = await someAsyncOperation(node.config);
      jobDone.status = JOB_STATUS.RESOLVED;
      jobDone.result = result;
    } catch (error) {
      jobDone.status = JOB_STATUS.FAILED;
      jobDone.result = { message: error.message };
    } finally {
      // 4. Повторно получаем задачу из БД; не используем объект из памяти
      const job = await this.workflow.app.db.getRepository('jobs').findOne({
        filterByTk: id,
      });
      job.set(jobDone);

      // 5. Сообщаем движку рабочего процесса, что выполнение можно возобновить
      this.workflow.resume(job);
    }
    // 6. Ничего не возвращаем (void); исполнитель немедленно завершится
  }

  async resume(node: FlowNodeModel, job, processor) {
    // Задача уже получила финальный статус в run(), просто возвращаем её
    return job;
  }
}

Несколько важных деталей:

Почему нужно явно вызывать processor.exit(), а не возвращать задачу со статусом ожидания
return { status: PENDING } немедленно завершает функцию run, поэтому код после него уже не выполнится. Вызов await processor.exit() лишь фиксирует транзакцию и выходит из контекста базы данных, тогда как сама функция продолжает выполняться. Это позволяет дождаться длительной операции в том же теле функции и вызвать resume после её завершения. Если пропустить exit() и напрямую использовать await для длительной операции перед возвратом, транзакция базы данных будет удерживаться долгое время. Это может привести к конкуренции блокировок, а запись задачи не будет сохранена до фиксации транзакции после завершения операции.

Почему нужно повторно получать задачу, а не использовать объект, возвращённый saveJob?
Объект, возвращённый saveJob, — это экземпляр модели в памяти, привязанный к исходной транзакции. После вызова processor.exit() эта транзакция зафиксирована и закрыта. Прямое изменение этого экземпляра и вызов resume могут привести к некорректному состоянию ORM: устаревшим ссылкам на транзакцию, несогласованности состояния и другим проблемам. Повторный запрос задачи из базы данных по id гарантирует получение чистого экземпляра, не привязанного ни к какой транзакции.

Почему функция run ничего не возвращает (void)?
processor.exit() уже был вызван вручную. Когда исполнитель получает void, он вызывает exit(true) и немедленно завершается без лишней обработки. Если в этот момент вернуть IJob, исполнитель попытается повторно сохранить данные и зафиксировать транзакцию, что приведёт к ошибкам. Подробнее см. раздел о типах возвращаемых значений run/resume.

Для сценариев, требующих внешних колбэков (например, результатов платежа, полученных через webhook), применяется тот же подход: вызвать processor.exit() до регистрации колбэка, чтобы запись задачи уже была сохранена в базе данных до обратного вызова внешней системы. В колбэке нужно повторно получить задачу по id, а затем вызвать this.workflow.resume(job).

Полный пример из реального проекта: RequestInstruction.ts (узел HTTP-запроса, использующий этот паттерн в ветке асинхронного рабочего процесса)

Статус результата узла

Статус выполнения узла влияет на успех или сбой всего рабочего процесса. Обычно, если ветвей нет, ошибка узла напрямую приводит к ошибке всего рабочего процесса. Наиболее типичный сценарий: если узел выполнен успешно, выполнение переходит к следующему узлу в таблице узлов, и так до тех пор, пока не останется последующих узлов; после этого весь рабочий процесс завершается успешно.

Если во время выполнения узел возвращает статус ошибки, движок обрабатывает это по-разному в следующих двух случаях:

  1. Узел, вернувший ошибочный статус, находится в основном рабочем процессе, то есть не внутри подпроцесса ветви, открытого вышестоящим узлом. В этом случае весь основной рабочий процесс считается неуспешным, и процесс завершается.

  2. Узел, вернувший ошибочный статус, находится внутри подпроцесса ветви. В этом случае ответственность за определение следующего состояния рабочего процесса передаётся узлу, который открыл ветвь. Его внутренняя логика определяет состояние последующего потока, и это решение рекурсивно распространяется вверх до основного рабочего процесса.

В итоге следующее состояние всего рабочего процесса определяется на уровне узлов основного потока. Если узел в основном потоке возвращает ошибку, весь рабочий процесс завершается со статусом сбоя.

Если любой узел после выполнения возвращает статус «ожидание», весь процесс выполнения временно прерывается и приостанавливается, ожидая событие, определённое соответствующим узлом, которое возобновит рабочий процесс. Например, узел «Ручная обработка» при выполнении остановится на этом узле со статусом «ожидание», ожидая ручного решения об одобрении. Если вручную введён статус одобрения, выполнение последующих узлов продолжится; иначе применится логика ошибки, описанная выше.

О других статусах возврата инструкции см. Справочник API.

Типы возвращаемых значений run/resume и поведение исполнителя

Полное определение типа возвращаемого значения методов run и resume:

type InstructionResult = IJob | Promise<IJob> | Promise<void> | Promise<null> | null | void;

После вызова инструкции исполнитель (Processor) выполняет различную логику обработки в зависимости от типа возвращаемого значения. Существует три случая.

1. Возврат объекта задачи IJob

Наиболее распространённый случай. Возвращается объект с обязательным полем status и необязательным полем result. Исполнитель сохраняет его как запись задачи узла и определяет дальнейший ход выполнения по значению status:

  • JOB_STATUS.RESOLVED: Узел успешно выполнен; продолжает выполнение следующего узла при его наличии, иначе рабочий процесс завершается
  • JOB_STATUS.PENDING: Узел переходит в состояние ожидания; текущий контекст выполнения останавливается, ожидая внешнего события для запуска resume
  • Другие статусы ошибок (FAILED, ERROR и т.д.): Передаются родительскому узлу ветки или напрямую завершают весь рабочий процесс

Этот путь является полным путём фиксации транзакции — исполнитель сохраняет запись задачи, записывает в базу данных и фиксирует транзакцию.

Пример: ConditionInstruction.ts (возвращает объект job напрямую при отсутствии ветки; при наличии ветки см. случай void ниже)

2. Возврат null

При возврате null исполнитель вызывает processor.exit() (без аргументов), что приводит к: сбросу текущих ожидающих задач в базу данных и фиксации транзакции, но без обновления общего статуса выполнения.

Такое использование характерно для метода resume узлов управления ветками: ветка завершилась и статус задачи родительского узла необходимо обновить и сохранить (например, записать «ветка N завершена»), но другие ветки ещё выполняются, и общее выполнение должно оставаться в статусе STARTED, ожидая оставшихся веток — возврат null выходит из текущего контекста resume без влияния на общий статус выполнения.

Пример: ParallelInstruction.ts

  • Строка 117: Параллельный узел уже завершён досрочно (resolved/rejected); игнорирует последующие resume веток и возвращает null напрямую
  • Строка 135: Некоторые ветки ещё не завершены (PENDING); сохраняет текущий прогресс и возвращает null для продолжения ожидания других веток

3. Возврат void (без возврата, т.е. неявный undefined)

При возврате void (функция не содержит явного оператора return, или путь выполнения завершается без возвращаемого значения) исполнитель вызывает processor.exit(true), что приводит к немедленному возврату без выполнения каких-либо операций с базой данных.

Этот паттерн используется исключительно в сценариях, когда инструкция взяла на себя управление планированием выполнения: инструкция вручную запускает подпроцесс через processor.run(), и цепочка выполнения подпроцесса самостоятельно обработает запись в базу данных и фиксацию транзакции при завершении. Исполнитель не должен повторно обрабатывать данные.

Типичные примеры:

  • ConditionInstruction.ts#L67: При наличии ветки вручную вызывает processor.run(branchNode, savedJob), затем функция завершается, неявно возвращая void
  • ParallelInstruction.ts#L108: Перебирает все ветки и вызывает processor.run(branch, job) для каждой, затем функция завершается, неявно возвращая void

:::warn{title=Предупреждение} Если перед возвратом void был вызван processor.saveJob(), эти записи задач не будут записаны в базу данных текущим исполнителем. Они временно хранятся в списке задач исполнителя (в памяти) и будут сброшены в базу данных через exit(), вызванный при завершении подвыполнения, запущенного processor.run(). Поэтому при использовании данного паттерна необходимо убедиться, что существует путь подвыполнения, который завершается в штатном режиме для сохранения этих записей. Планирование ветвящихся рабочих процессов обладает определённой сложностью; требует тщательного проектирования и всестороннего тестирования. :::

Сравнительная таблица трёх возвращаемых значений:

Возвращаемое значениеПоведение исполнителяТипичный сценарий использования
IJobСохраняет задачу, продолжает/завершает/приостанавливает поток на основе statusОбычное выполнение узла с результатом и статусом
nullСбрасывает ожидающие задачи и фиксирует транзакцию, не обновляет статус выполненияВетка ещё ожидает, временно выходит из текущего контекста выполнения
voidНемедленно возвращается, без операций с БДУзел запланировал подпроцесс, позволяя ему взять на себя последующую обработку

Дополнительно

Определения различных параметров для задания типов узлов см. в Справочнике API.

Клиентская часть

Как и для триггера, форму настройки для инструкции (типа узла) нужно реализовать на клиенте.

Самая простая инструкция узла

Все инструкции должны наследоваться от базового класса Instruction. Его свойства и методы используются для настройки и использования узла.

Например, если нужно предоставить интерфейс настройки для узла «строка случайного числа» (randomString), определённого выше на сервере, у которого есть параметр digit (количество цифр), в форме настройки можно использовать поле числового ввода для приёма значения от пользователя.

import WorkflowPlugin, { Instruction, VariableOption } from '@nocobase/workflow/client';

class MyInstruction extends Instruction {
  title = 'Random number string';
  type = 'randomString';
  group = 'extended';
  fieldset = {
    'digit': {
      type: 'number',
      title: 'Digit',
      name: 'digit',
      'x-decorator': 'FormItem',
      'x-component': 'InputNumber',
      'x-component-props': {
        min: 1,
        max: 10,
      },
      default: 6,
    },
  };
  useVariables(node, options): VariableOption {
    return {
      value: node.key,
      label: node.title,
    };
  }
}

export default class MyPlugin extends Plugin {
  load() {
    // получение экземпляра плагина рабочего процесса
    const workflowPlugin = this.app.getPlugin<WorkflowPlugin>(WorkflowPlugin);

    // регистрация инструкции
    workflowPlugin.registerInstruction('randomString', MyInstruction);
  }
}
Примечание

Идентификатор типа узла, зарегистрированный на клиенте, должен совпадать с идентификатором на сервере, иначе возникнут ошибки.

Предоставление результатов узла как переменных

В приведённом выше примере используется метод useVariables. Если нужно использовать результат узла (часть result) как переменную в последующих узлах, нужно реализовать этот метод в унаследованном классе инструкции и вернуть объект, соответствующий типу VariableOption. Этот объект задаёт структуру результата выполнения узла и сопоставление имён переменных для выбора и использования в следующих узлах.

Тип VariableOption определяется так:

export type VariableOption = {
  value?: string;
  label?: string;
  children?: VariableOption[] | null;
  [key: string]: any;
};

Ключевым является свойство value: это сегментированный путь имени переменной. Свойство label задаёт подпись в интерфейсе, а children — многоуровневую структуру переменных, когда результат узла — глубоко вложенный объект.

Используемая переменная внутри системы задаётся как строка шаблона пути, разделённого точкой, например {{jobsMapByNodeKey.2dw92cdf.abc}}. Здесь jobsMapByNodeKey — набор результатов всех узлов (внутреннее определение, отдельно обрабатывать не нужно), 2dw92cdfkey узла, а abc — пользовательское свойство в объекте результата узла.

Кроме того, поскольку результат узла может быть и простым значением, при предоставлении переменных узла первый уровень обязательно должен быть описанием самого узла:

{
  value: node.key,
  label: node.title,
}

То есть первый уровень — это key и заголовок узла. Например, в исходном коде узла «Вычисление» при использовании результата узла «Вычисление» в интерфейсе отображаются такие варианты:

Результат узла вычисления

Когда результат узла — сложный объект, можно использовать children для описания вложенных свойств. Например, пользовательская инструкция может возвращать такие JSON-данные:

{
  "message": "ok",
  "data": {
    "id": 1,
    "name": "test",
  }
}

Тогда через метод useVariables можно вернуть описание так:

useVariables(node, options): VariableOption {
  return {
    value: node.key,
    label: node.title,
    children: [
      {
        value: 'message',
        label: 'Message',
      },
      {
        value: 'data',
        label: 'Data',
        children: [
          {
            value: 'id',
            label: 'ID',
          },
          {
            value: 'name',
            label: 'Name',
          },
        ],
      },
    ],
  };
}

После этого в последующих узлах можно выбирать переменные через такой интерфейс:

Сопоставленные переменные результата

Примечание

Когда структура результата является массивом глубоко вложенных объектов, путь также можно описывать через children, но без индексов массива. Это связано с тем, что в обработке переменных NocoBase в рабочем процессе описание пути для массива объектов автоматически разворачивается в массив вложенных значений, и обратиться к конкретному значению по индексу нельзя.

Доступность узла

По умолчанию в рабочий процесс можно добавить любой узел. Однако в некоторых случаях узел может быть неприменим для определённых типов рабочего процесса или ветвей. Тогда можно задать доступность узла через isAvailable:

// определение типа
export abstract class Instruction {
  isAvailable?(ctx: NodeAvailableContext): boolean;
}

export type NodeAvailableContext = {
  // экземпляр плагина рабочего процесса
  engine: WorkflowPlugin;
  // экземпляр рабочего процесса
  workflow: object;
  // вышестоящий узел
  upstream: object;
  // ветвящийся узел (номер ветви)
  branchIndex: number;
};

Метод isAvailable возвращает true, если узел доступен, и false, если недоступен. Параметр ctx содержит контекст текущего узла для определения доступности.

Если особых требований нет, метод isAvailable можно не реализовывать: узлы по умолчанию доступны. Частый сценарий — узел выполняет длительную операцию и не подходит для синхронного рабочего процесса; тогда ограничение задают через isAvailable. Например:

isAvailable({ engine, workflow, upstream, branchIndex }) {
  return !engine.isWorkflowSync(workflow);
}

Дополнительно

Описание параметров типов узлов см. в Справочнике API.