logologo
Начало
Руководство
Разработка
Плагины
API
Главная
English
简体中文
日本語
한국어
Español
Português
Deutsch
Français
Русский
Начало
Руководство
Разработка
Плагины
API
Главная
logologo
Кластерный режим
Обзор
Подготовка
Развертывание в Kubernetes
Процессы эксплуатации
Разделение сервисов
Справочник для разработчиков
Previous PageРазделение сервисов
Уведомление о переводе ИИ

Эта документация была автоматически переведена ИИ.

#Разработка плагинов

#Проблематика

В одноузловой среде плагины обычно могут выполнять свои задачи, используя внутрипроцессное состояние, события или задачи. Однако в кластерном режиме один и тот же плагин может одновременно работать на нескольких экземплярах, сталкиваясь со следующими типичными проблемами:

  • Согласованность состояния: Если данные конфигурации или данные времени выполнения хранятся только в памяти, их сложно синхронизировать между экземплярами, что может привести к «грязному чтению» или повторному выполнению операций.
  • Планирование задач: Без четкого механизма постановки в очередь и подтверждения длительные задачи могут выполняться одновременно несколькими экземплярами.
  • Состояния гонки: Операции, связанные с изменением схемы или распределением ресурсов, требуют сериализации для предотвращения конфликтов, вызванных одновременной записью.

Ядро NocoBase предоставляет на уровне приложения различные интерфейсы промежуточного ПО, которые помогают плагинам использовать унифицированные возможности в кластерной среде. Ниже мы рассмотрим использование и лучшие практики для кэширования, синхронных сообщений, очередей сообщений и распределенных блокировок, с примерами из исходного кода.

#Решения

#Компонент кэширования (Cache)

Для данных, которые необходимо хранить в памяти, рекомендуется использовать встроенный компонент кэширования системы.

  • Получите экземпляр кэша по умолчанию через app.cache.
  • Cache предоставляет базовые операции, такие как set/get/del/reset, а также поддерживает wrap и wrapWithCondition для инкапсуляции логики кэширования, и пакетные методы, такие как mset/mget/mdel.
  • При развертывании в кластере рекомендуется размещать общие данные в постоянном хранилище (например, Redis) и устанавливать разумное значение ttl, чтобы предотвратить потерю кэша при перезапуске экземпляра.

Пример: Инициализация и использование кэша в plugin-auth

Создание
// packages/plugins/@nocobase/plugin-auth/src/server/plugin.ts
async load() {
  this.cache = await this.app.cacheManager.createCache({
    name: 'auth',
    prefix: 'auth',
    store: 'redis',
  });

  await this.cache.wrap('token:config', async () => {
    const repo = this.app.db.getRepository('tokenPolicies');
    return repo.findOne({ filterByTk: 'default' });
  }, 60 * 1000);
}

#Менеджер синхронных сообщений (SyncMessageManager)

Если состояние в памяти не может быть управляемо с помощью распределенного кэша (например, его невозможно сериализовать), то при изменении состояния в результате действий пользователя необходимо уведомить об этих изменениях другие экземпляры с помощью синхронного сигнала для поддержания согласованности состояния.

  • Базовый класс плагина уже реализует sendSyncMessage, который внутренне вызывает app.syncMessageManager.publish и автоматически добавляет префикс уровня приложения к каналу, чтобы избежать конфликтов каналов.
  • Метод publish может указывать transaction, и сообщение будет отправлено после фиксации транзакции базы данных, что гарантирует синхронизацию состояния и сообщения.
  • Обрабатывайте сообщения, поступающие от других экземпляров, с помощью handleSyncMessage. Подписка на сообщения на этапе beforeLoad очень подходит для сценариев, таких как изменение конфигурации и синхронизация схемы.

Пример: plugin-data-source-main использует синхронные сообщения для поддержания согласованности схемы между несколькими узлами

Синхронизация
export class PluginDataSourceMainServer extends Plugin {
  async handleSyncMessage(message) {
    if (message.type === 'syncCollection') {
      await this.app.db.getRepository('collections').load(message.collectionName);
    }
  }

  private sendSchemaChange(data, options) {
    this.sendSyncMessage(data, options); // Автоматически вызывает app.syncMessageManager.publish
  }
}

#Менеджер публикации/подписки (PubSubManager)

Широковещательная рассылка сообщений является базовым компонентом синхронных сигналов и также может использоваться напрямую. Когда вам нужно транслировать сообщения между экземплярами, вы можете реализовать это с помощью данного компонента.

  • app.pubSubManager.subscribe(channel, handler, { debounce }) позволяет подписываться на канал между экземплярами; опция debounce используется для устранения дребезга, предотвращая частые обратные вызовы, вызванные повторной трансляцией.
  • Метод publish поддерживает skipSelf (по умолчанию true) и onlySelf для управления тем, будет ли сообщение отправлено обратно текущему экземпляру.
  • Адаптер (например, Redis, RabbitMQ и т. д.) должен быть настроен до запуска приложения; в противном случае по умолчанию не будет установлено соединение с внешней системой обмена сообщениями.

Пример: plugin-async-task-manager использует PubSub для трансляции событий отмены задач

Трансляция
const channel = `${plugin.name}.task.cancel`;

await this.app.pubSubManager.subscribe(channel, async ({ id }) => {
  this.logger.info(`Task ${id} cancelled on other node`);
  await this.stopLocalTask(id);
});

await this.app.pubSubManager.publish(channel, { id: taskId }, { skipSelf: true });

#Компонент очереди событий (EventQueue)

Очередь сообщений используется для планирования асинхронных задач и подходит для обработки длительных или повторяемых операций.

  • Объявите потребителя с помощью app.eventQueue.subscribe(channel, { idle, process, concurrency }). Метод process возвращает Promise, и вы можете использовать AbortSignal.timeout для контроля времени ожидания.
  • Метод publish автоматически добавляет префикс имени приложения и поддерживает такие опции, как timeout и maxRetries. По умолчанию он использует адаптер очереди в памяти, но при необходимости может быть переключен на расширенные адаптеры, такие как RabbitMQ.
  • В кластере убедитесь, что все узлы используют один и тот же адаптер, чтобы избежать фрагментации задач между узлами.

Пример: plugin-async-task-manager использует EventQueue для планирования задач

Распределение
this.app.eventQueue.subscribe(`${plugin.name}.task`, {
  concurrency: this.concurrency,
  idle: this.idle,
  process: async (payload, { signal }) => {
    await this.runTask(payload.id, { signal });
  },
});

await this.app.eventQueue.publish(`${plugin.name}.task`, { id: taskId }, { maxRetries: 3 });

#Менеджер распределенных блокировок (LockManager)

Когда необходимо избежать состояний гонки, вы можете использовать распределенную блокировку для сериализации доступа к ресурсу.

  • По умолчанию предоставляется адаптер local на основе процессов. Вы можете зарегистрировать распределенные реализации, такие как Redis; управляйте параллелизмом с помощью app.lockManager.runExclusive(key, fn, ttl) или acquire/tryAcquire.
  • ttl используется для гарантированного снятия блокировки, предотвращая ее постоянное удержание в исключительных ситуациях.
  • Типичные сценарии включают: изменение схемы, предотвращение дублирования задач, ограничение скорости запросов и т. д.

Пример: plugin-data-source-main использует распределенную блокировку для защиты процесса удаления полей

Сериализация
const lockKey = `${this.name}:fields.beforeDestroy:${collectionName}`;
await this.app.lockManager.runExclusive(lockKey, async () => {
  await fieldModel.remove(options);
  this.sendSyncMessage({ type: 'removeField', collectionName, fieldName });
});

#Рекомендации по разработке

  • Согласованность состояния в памяти: Старайтесь избегать использования состояния в памяти при разработке. Вместо этого используйте кэширование или синхронные сообщения для поддержания согласованности состояния.
  • Приоритет повторного использования встроенных интерфейсов: Используйте унифицированные возможности, такие как app.cache, app.syncMessageManager и другие, чтобы избежать повторной реализации логики межузлового взаимодействия в плагинах.
  • Внимание к границам транзакций: Операции с транзакциями должны использовать transaction.afterCommit (встроенный в syncMessageManager.publish) для обеспечения согласованности данных и сообщений.
  • Разработайте стратегию отката (backoff): Для задач в очередях и широковещательных задач разумно устанавливайте значения timeout, maxRetries и debounce, чтобы предотвратить новые пики трафика в исключительных ситуациях.
  • Сопутствующий мониторинг и логирование: Эффективно используйте журналы приложения для записи имен каналов, содержимого сообщений, ключей блокировок и другой информации, чтобы упростить устранение периодически возникающих проблем в кластере.

Благодаря этим возможностям плагины могут безопасно обмениваться состоянием, синхронизировать конфигурации и планировать задачи между различными экземплярами, отвечая требованиям стабильности и согласованности в сценариях кластерного развертывания.