Разработка плагинов

Кластер NocoBaseEnterprise Edition+

Контекст

В одноузловой среде плагины обычно выполняют свои задачи с помощью данных, которые хранятся в памяти процесса, событий или фоновых задач. Но в кластерном режиме один и тот же плагин может быть запущен сразу на нескольких экземплярах приложения. Из-за этого могут возникать типичные проблемы:

  • Согласованность состояния: если конфигурация или временные данные хранятся только в памяти одного экземпляра, другие экземпляры не смогут получить актуальное состояние. Это может привести к чтению устаревших данных или повторному выполнению одной и той же операции.
  • Планирование задач: если нет отдельного механизма очередей и подтверждения выполнения, длительная задача может быть запущена одновременно на нескольких экземплярах.
  • Состояния гонки: операции, которые изменяют схему данных или распределяют ресурсы, должны выполняться последовательно. Иначе при одновременной записи могут возникнуть конфликты.

Ядро NocoBase предоставляет на уровне приложения набор интерфейсов промежуточного слоя. Они помогают плагинам использовать общие механизмы для работы в кластерной среде: кэширование, синхронные сообщения, очереди сообщений и распределённые блокировки. Ниже приведены рекомендации и примеры их использования.

Решения

Компонент кэша

Для данных, которые нужно хранить в памяти, рекомендуется использовать встроенный компонент кэша.

  • Экземпляр кэша по умолчанию — через app.cache.
  • Cache предоставляет set/get/del/reset, а также wrap и wrapWithCondition для инкапсуляции логики кэширования, плюс пакетные методы mset/mget/mdel.
  • При кластерном развёртывании общие данные лучше хранить в постоянном хранилище (например, Redis) и задавать разумный ttl, чтобы избежать потери кэша при перезапуске экземпляра.

Пример: инициализация и использование кэша в plugin-auth

Создание
// packages/plugins/@nocobase/plugin-auth/src/server/plugin.ts
async load() {
  this.cache = await this.app.cacheManager.createCache({
    name: 'auth',
    prefix: 'auth',
    store: 'redis',
  });

  await this.cache.wrap('token:config', async () => {
    const repo = this.app.db.getRepository('tokenPolicies');
    return repo.findOne({ filterByTk: 'default' });
  }, 60 * 1000);
}

Менеджер синхронных сообщений

Если состояние в памяти нельзя вынести в распределённый кэш (например, его нельзя сериализовать), то при изменении состояния пользовательскими действиями изменение нужно рассылать другим экземплярам через сигнал синхронизации для сохранения согласованности.

  • Базовый класс Plugin уже реализует sendSyncMessage, который внутри вызывает app.syncMessageManager.publish и автоматически добавляет префикс уровня приложения к каналу, чтобы избежать конфликтов.
  • В publish можно передать transaction; сообщение будет отправлено после фиксации транзакции БД, что обеспечивает согласованность состояния и сообщения.
  • Для обработки сообщений от других экземпляров используйте handleSyncMessage. Подписка на этапе beforeLoad хорошо подходит для сценариев изменений конфигурации и синхронизации схемы.

Пример: plugin-data-source-main использует сообщения синхронизации для согласованности схемы между узлами

Синхронизация
export class PluginDataSourceMainServer extends Plugin {
  async handleSyncMessage(message) {
    if (message.type === 'syncCollection') {
      await this.app.db.getRepository('collections').load(message.collectionName);
    }
  }

  private sendSchemaChange(data, options) {
    this.sendSyncMessage(data, options); // автоматически вызывает app.syncMessageManager.publish
  }
}

Менеджер Pub/Sub

Широковещательная рассылка сообщений — лежит в основе сигналов синхронизации, но её можно использовать и напрямую. Когда нужно рассылать сообщения между экземплярами, используйте этот компонент.

  • app.pubSubManager.subscribe(channel, handler, { debounce }) подписывает канал между экземплярами; параметр debounce снижает частоту вызовов обработчика при повторных рассылках.
  • publish поддерживает skipSelf (по умолчанию true) и onlySelf, чтобы контролировать возврат сообщения в текущий экземпляр.
  • До старта приложения нужно настроить адаптер (Redis, RabbitMQ и т. п.), иначе внешняя система сообщений по умолчанию не подключается.

Пример: plugin-async-task-manager использует Pub/Sub для рассылки событий отмены задач

Трансляция
const channel = `${plugin.name}.task.cancel`;

await this.app.pubSubManager.subscribe(channel, async ({ id }) => {
  this.logger.info(`Task ${id} cancelled on other node`);
  await this.stopLocalTask(id);
});

await this.app.pubSubManager.publish(channel, { id: taskId }, { skipSelf: true });

Компонент очереди событий

Очередь сообщений используется для планирования асинхронных задач и подходит для длительных или повторяемых операций.

  • Объявляйте обработчик очереди через app.eventQueue.subscribe(channel, { idle, process, concurrency }). process возвращает Promise; для таймаутов можно использовать AbortSignal.timeout.
  • publish автоматически добавляет префикс имени приложения и поддерживает timeout, maxRetries. По умолчанию используется адаптер очереди в памяти процесса, при необходимости его можно заменить на внешние адаптеры вроде RabbitMQ.
  • В кластере на всех узлах должен быть настроен один и тот же адаптер очереди, иначе распределение задач между узлами будет несогласованным.

Пример: plugin-async-task-manager использует компонент очереди событий для планирования задач

Постановка
this.app.eventQueue.subscribe(`${plugin.name}.task`, {
  concurrency: this.concurrency,
  idle: this.idle,
  process: async (payload, { signal }) => {
    await this.runTask(payload.id, { signal });
  },
});

await this.app.eventQueue.publish(`${plugin.name}.task`, { id: taskId }, { maxRetries: 3 });

Менеджер распределённых блокировок

Чтобы избежать состояний гонки, используйте распределённую блокировку для сериализации доступа к ресурсу.

  • По умолчанию доступен процессный адаптер local. Можно регистрировать распределённые реализации, например на базе Redis. Для контроля конкурентности используйте app.lockManager.runExclusive(key, fn, ttl) или acquire/tryAcquire.
  • ttl задаёт срок жизни блокировки и служит страховкой от бессрочного удержания при ошибках.
  • Типовые сценарии: изменения схемы, предотвращение дублирования задач, ограничение частоты операций и т. п.

Пример: plugin-data-source-main использует распределённую блокировку для защиты процесса удаления полей

Сериализация
const lockKey = `${this.name}:fields.beforeDestroy:${collectionName}`;
await this.app.lockManager.runExclusive(lockKey, async () => {
  await fieldModel.remove(options);
  this.sendSyncMessage({ type: 'removeField', collectionName, fieldName });
});

Рекомендации по разработке

  • Согласованность состояния в памяти: по возможности избегайте хранения состояния только в памяти процесса; для согласованности используйте кэш или сообщения синхронизации.
  • Переиспользуйте встроенные интерфейсы: используйте app.cache, app.syncMessageManager и др., чтобы не реализовывать межузловую коммуникацию в плагинах заново.
  • Учитывайте границы транзакций: для операций с транзакциями используйте transaction.afterCommit (syncMessageManager.publish уже это поддерживает), чтобы обеспечить согласованность данных и сообщений.
  • Настраивайте повторные попытки с нарастающей задержкой: для задач в очереди и широковещательных рассылок задавайте разумные timeout, maxRetries, debounce, чтобы избегать всплесков нагрузки при сбоях.
  • Используйте мониторинг и логи: фиксируйте в логах каналы, полезную нагрузку сообщений, ключи блокировок и т. п. для диагностики нестабильных сбоев в кластере.

С этими возможностями плагины могут безопасно разделять состояние, синхронизировать конфигурацию и планировать задачи между разными экземплярами, удовлетворяя требованиям стабильности и согласованности в сценариях кластерного развёртывания.