Держите ваши индексы актуальными с помощью этого реального пайплайна

Поддерживайте ваши индексы актуальными с помощью этого реального конвейера

by LJ 7 апреля 2025
tldt arrow

Слишком сложно; не читал

CocoIndex постоянно отслеживает изменения источника и поддерживает синхронизацию производных данных с минимальной задержкой и минимальными затратами на производительность.

Сегодня мы рады объявить о поддержке непрерывных обновлений для долгосрочных конвейеров в CocoIndex. Эта мощная функция автоматически применяет инкрементальные изменения источника, чтобы ваш индекс оставался актуальным с минимальной задержкой.

С помощью непрерывных обновлений ваши индексы остаются синхронизированными с вашими исходными данными в реальном времени, что гарантирует, что ваши приложения всегда имеют доступ к самой последней информации без затрат на полную переиндексацию.

Если вам нравится наша работа, нам было бы очень приятно, если бы вы поддержали Cocoindex на Github звездочкой. Большое спасибо за теплые обнимашки с кокосом 🥥🤗.

Когда это использовать?

Это подходит для ситуаций, когда вам необходимо постоянно получать свежие целевые данные.

Как это работает?

Система непрерывно захватывает изменения из источника и обновляет целевые данные соответствующим образом. Она работает долго и прекращает работу только по явной отмене.

Ресурс данных может включать один или несколько механизмов захвата изменений:

CocoIndex поддерживает две основные категории механизмов обнаружения изменений:

  1. Общий механизм:
    • Интервал обновления: Универсальный подход, применимый ко всем источникам данных, позволяющий периодически проверять изменения путем сканирования и сравнения текущего состояния с предыдущим состоянием.
  2. Специфические для источника механизмы:
    • Уведомление об изменении Push: Многие источники данных предлагают встроенные механизмы для уведомления об изменениях в реальном времени, на которые CocoIndex может подписываться (в ближайшее время!)
    • Опрос недавних изменений: Некоторые источники данных предоставляют API для отслеживания недавно измененных записей, что позволяет эффективно обнаруживать изменения; например, Google Drive.

Эти механизмы работают совместно, чтобы гарантировать, что CocoIndex может обнаруживать и обрабатывать изменения по мере их появления, поддерживая ваш индекс в идеальной синхронизации с исходными данными с минимальной задержкой и потреблением ресурсов.

Под капотом, после обнаружения изменения, CocoIndex будет использовать свой механизм инкрементальной обработки для обновления целевых данных.

Как включить непрерывные обновления?

Вот пример того, как включить непрерывные обновления для Google Drive. Это довольно просто:

1. Настройте механизмы захвата изменений для источника

@cocoindex.flow_def(name="GoogleDriveIndex")
def my_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
    data_scope["documents"] = flow_builder.add_source(
        cocoindex.sources.GoogleDrive(
            service_account_credential_path=credential_path,
            root_folder_ids=root_folder_ids,
            recent_changes_poll_interval=datetime.timedelta(seconds=10)),
        refresh_interval=datetime.timedelta(minutes=1))

В этом примере мы настроили два механизма обнаружения изменений:

  1. recent_changes_poll_interval=datetime.timedelta(seconds=10): Это механизм, специфичный для Google Drive, который использует конечную точку изменений API Drive для эффективного обнаружения модификаций каждые 10 секунд. Это эффективное быстрое сканирование для захвата всех последних измененных файлов, и мы можем установить его на короткий интервал, чтобы получить более свежие данные. Он не захватывает удаления файлов, поэтому нам нужен механизм резервного копирования, чтобы гарантировать, что все изменения в конечном итоге будут захвачены.

  2. refresh_interval=datetime.timedelta(minutes=1): Это универсальный механизм резерва, который выполняет полное сканирование источника данных каждую минуту. Это нужно для сканирования всех файлов, чтобы гарантировать захват всех изменений, включая удаленные файлы.

    Параметр refresh_interval особенно важен, поскольку он служит защитной сеткой, чтобы гарантировать, что все изменения в конечном итоге будут захвачены, даже если специфические для источника механизмы что-то пропустят. Он работает путем:

    • Периодического сканирования списка записей (например, списка файлов с метаданными, такими как время последнего изменения) с заданным интервалом
    • Сравнения текущего списка с ранее известным списком
    • Выявления любых различий (добавления, модификации, удаления)
    • Срабатывания обновлений только для измененных элементов

Хотя специфические для источника механизмы, такие как recent_changes_poll_interval, более эффективны для почти реального времени обновлений, refresh_interval обеспечивает полное покрытие. Мы рекомендуем устанавливать его на разумное значение в зависимости от ваших требований к свежести и ограничений ресурсов – более короткие интервалы обеспечивают более свежие данные, но требуют больше ресурсов.

Вы можете прочитать полную документацию:

  • для специфических для источника механизмов обнаружения изменений здесь. У разных источников поддерживаются разные механизмы.
  • для универсального резервного механизма здесь.

2. Запустите поток в режиме живого обновления

Вариант 1: CLI

Добавьте декоратор @cocoindex.main_fn() к вашей основной функции, чтобы CLI CocoIndex взял на себя управление (когда cocoindex является первым аргументом командной строки).

@cocoindex.main_fn()
def main():
    pass

if __name__ == "__main__":
  main()

Чтобы запустить CLI в режиме живого обновления, вы можете использовать следующую команду:

python main.py cocoindex update -L

Это запустит поток в режиме живого обновления, который будет постоянно захватывать изменения из источника и обновлять целевые данные соответствующим образом.

Вариант 2: Python API

Вы можете создать cocoindex.FlowLiveUpdater. Например,

@cocoindex.main_fn()
async def main():
    my_updater = cocoindex.FlowLiveUpdater(demo_flow)
    await my_updater.wait()
    ...

if __name__ == "__main__":
    asyncio.run(main())

И вы запускаете поток с помощью

python main.py

Вы также можете использовать обновления как менеджер контекста. Он будет автоматически завершать работу и ждать окончания обновления, когда контекст будет завершен. Полная документация здесь.

Теперь вы полностью готовы! Начать очень просто и иметь непрерывные обновления для ваших данных. Начните сейчас с руководства по быстрому старту 🚀

Нам было бы очень приятно, если бы вы поддержали Cocoindex на Github звездочкой, если вам нравится наша работа. Большое спасибо за теплые обнимашки с кокосом 🥥🤗.

Об авторе

LJ@badmonster0

Подписывайтесь

Хакер, Строитель, Основатель, CocoIndexЧитайте мои историиУзнайте больше

ТЕМЫ

программирование#непрерывная-интеграция#искусственный-интеллект#наука-данных#инженерия-данных#etl-инструменты#данные-в-реальном-времени#индексация-данных#синхронизация-данных-в-реальном-времени

ЭТА СТАТЬЯ БЫЛА ОПУБЛИКОВАНА В…

TerminalLiteHackernoonBskyПрисоединяйтесь к HackerNoon
Перейти к источнику
AI Daily

Ответить

Ваш адрес email не будет опубликован. Обязательные поля помечены *