AI Document Data Pipelines with S3 or Azure Blob Storage

Document data pipelines allow for the creation of automated workflows where an embedding model extracts text from documents added to a share and converts them into vector values that AI/ML applications can use for semantic search and as context for large language model inference. Vector embeddings are stored alongside snippets of the original text in a so-called “vector database” such as Postgres or AlloyDB, supporting vector relevance & similarity scoring for retrieval augmented AI generation. 

One of the major challenges of using data for AI applications is transporting the data from a source system, to an intermediate location such as a data lake or data warehouse, then finally on to a destination system with AI inference capabilities. An open source data integration platform such as Airbyte can certainly help with leveraging the data you already have for AI use cases like knowledge management, digital assistants, or sales, and support automation. Airbyte supports hundreds of different data sources & destinations through its connectors, enabling you to ask natural language questions about data from productivity & line-of-business applications using an AI chat platform such as BionicGPT.

The multitude of tools that a knowledge worker might use for productivity is growing, but almost anyone will be familiar with the concept of a file share whether its a traditional SMB/CIFS Windows share, an collaborative tool such as Sharepoint, OneDrive or Nextcloud, or an object storage service like Azure Blob Storage or Amazon S3. If your business’ unstructured data resides in any of these systems, Airbyte can be a solution for constructing an ELT pipeline for transporting the data to a vector database for RAG.

The “Document File Type Format” that Airbyte supports as an option when setting up a data stream from many of its connectors uses the Unstructured library to extract the text from markdown, text, PDF, Word, and Powerpoint documents. File shares or object storage buckets / blob containers containing CSV data are also supported through the “CSV Format.”. 

Our earlier piece about integrating Airbyte with BionicGPT addressed using Airbyte to ingest structured data from a relational database for a line-of-business app (e.g. HRM) into a RAG vector database. This article will instead talk about using Airbyte to create a pipeline for importing unstructured & semi-structured data into a Postgres vector database.

We will walk through an example ELT document pipeline that begins with users uploading .docx and .pptx documents into a Nextcloud External Storage mount backed by a S3 bucket.

Download PDF: Document Data Pipeline from Nextcloud to BionicGPT

  1. Nextcloud users upload documents to the S3 bucket through an External Storage mount.
  2. At the next scheduled sync, Airbyte picks up the added documents to the S3 bucket (configured as Airbyte source) and performs text extraction with the Unstructured library
  3. Airbyte notifies BionicGPT by inserting a message into the RabbitMQ message queue (configured as Airbyte destination).
  4. The BionicGPT cron job ingests the documents into the RAG vector database — PostgreSQL with the pgvector extension.
  5. Users can chat with the documents using a custom prompt and dataset in BionicGPT, and any large language model (locally hosted or remote).

Nextcloud’s support for mounting External Storages through the S3 API means that this document pipeline can be configured with any S3 compliant storage provider, including cloud-based competitors or on-prem object storage such as OpenStack Swift or MinIO. For simplicity, we will use Amazon S3 to demonstrate the ETL pipeline, but it should be noted that other options are available for more private and sovereign storage.

Create an S3 Bucket and configure IAM in AWS.

There are some preparatory steps to perform through the AWS Management Console / CLI.

Create an AWS S3 bucket through the AWS CloudShell. The region that the bucket will be created in (e.g. us-west-2) will be displayed above the CloudShell terminal. You need to choose a bucket name that is globally unique, and not already used by other S3 users.

$ aws s3 mb s3://<bucket name>

Configuring two custom IAM policies (one with read/write access and one read-only access to the bucket), creating an IAM user corresponding to each policy, and generating API keys. 

Create “Allow-RO-S3-Bionic-Docs” IAM user policy

Note the ARN of the policy which you will be returned in JSON format under the JSON key “arn”, in the format, “arn:aws:iam::<account id>:policy/<policy name>”, for example, “arn:aws:iam::123456789000:policy/Allow-RO-S3-Bionic-Data”. It will be required in a subsequent step to assign the policy when creating an IAM user.

$ aws iam create-policy \
--policy-name Allow-RO-S3-Bionic-Data \
--policy-document \
'{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::<bucket name>",
                "arn:aws:s3:::<bucket name>/*"
            ]
        }
    ]
}'

Create “Allow-RW-S3-Bionic-Docs” IAM user policy

$ aws iam create-policy \
--policy-name Allow-RW-S3-Bionic-Data \
--policy-document \
'{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:*"
            ],
            "Resource": [
                "arn:aws:s3:::<bucket name>",
                "arn:aws:s3:::<bucket name>/*"
            ]
        }
    ]
}'

Create IAM users, attach the appropriate policy, and generate API keys.

After running each aws iam create-access-key command, note the “AccessKeyId” and “SecretAccessKey” JSON keys. These two API keys (one for the read-write user and one for the read-only user) will be needed to mount the S3 bucket as an External Storage in Nextcloud with the read-write key, in addition to authorizing access to it by Airbyte with the read-only key.

$ aws iam create-user --user-name bionic-data-s3-ro
$ aws iam attach-user-policy --user-name bionic-data-s3-ro --policy-arn arn:aws:iam::<account id>:policy/Allow-RO-S3-Bionic-Data
$ aws iam create-access-key --user-name bionic-data-s3-ro

$ aws iam create-user --user-name bionic-data-s3-rw
$ aws iam attach-user-policy --user-name bionic-data-s3-rw --policy-arn arn:aws:iam::<account id>:policy/Allow-RW-S3-Bionic-Data
$ aws iam create-access-key --user-name bionic-data-s3-rw

Mount the S3 Bucket as an External Storage in Nextcloud

From a user on your Nextcloud instance that belongs to the “admin” user group, navigate to Settings > Administration > External Storages or https://<Nextcloud URL>/settings/admin/externalstorages.

Using the access key to the IAM user with read-write permissions previously created, use the following values. If you wish to restrict access to add files to the custom RAG dataset to certain Nextcloud users, it is advisable to create a Nextcloud user group and assign the authorized users to that group.

  • Folder name: Folder name that the External Storage should appear as in Nextcloud
  • External storage: Amazon S3
  • Authentication: Access key
  • Bucket name: created through aws s3 mb command
  • Hostname: s3.amazonaws.com
  • Storage Class: blank
  • Enable SSL: Yes
  • Enable Path Style: No
  • Legacy (v2) Authentication: No
  • Enable multipart copy: Yes
  • Access key: created through aws iam create-access-key command for read-write user
  • Secret key: created through aws iam create-access-key command for read-write user

If you created the External Storage mount to the S3 bucket correctly, it will appear in the Nextcloud dashboard for users who are part of the “S3” user group like this.

Create the Airbyte destination with the RabbitMQ connector.

The next steps assume you already have a running Airbyte and RabbitMQ deployment as described in this article. If you do not have them set up yet, use the Airbyte Helm chart and RabbitMQ kubectl apply commands to get up and running. It is also essential to define the Bionic cron job that polls the RabbitMQ service for new messages every minute.

Through the RabbitMQ Management Panel, you need to create the bionic-pipeline queue and configure a binding so any messages POSTed to RabbitMQ with “*.bionic-pipeline” as the routing key should be associated with the default “amq.topic” exchange.

In addition, through the Airbyte webapp, you must configure RabbitMQ as an Airbyte destination using these settings. The string preceding “.bionic-pipeline” for the routing_key setting should be the API key generated by BionicGPT for the corresponding Document Pipeline..

Create the Airbyte source with the S3 connector.

Create an Airbyte source using the S3 connector so that Airbyte can pull down documents that are uploaded into the mounted folder in Nextcloud. Here are the settings in particular to pay attention to:

  • Bucket: Name of the S3 bucket
  • The list of streams to sync
    • Create a stream and select “Document File Type Format (Experimental)” in the dropdown menu if the documents to be embedded are in markdown, text, PDF, Word, and Powerpoint format. Otherwise, select “CSV Format” if the document data is in CSV format.
  • Leave the Globs option set to the default “**” if you want to pull down all objects in the bucket.

You can configure more than one stream for an S3 source, for example, if you wanted to set up a stream for document file types and another stream for CSVs. In this case, you might use the glob option to pattern match for the appropriate file extensions for each stream. 

  • AWS Access Key ID:for read-only user
  • AWS Secret Access Key: for read-write user

Create the Airbyte connection from S3 → RabbitMQ

Airbyte will fetch the stream(s) you defined when setting up the S3 data source and provide a list with toggle sliders to activate the streams you want to sync. You will most likely only have a single stream if you are only uploading documents supported by the text extraction capabilities of the Unstructured library.

You should also specify other parameters for the connection, including:

  • Connection name: S3 → RabbitMQ
  • Schedule type: Scheduled
  • Replication frequency: Every hour
  • Destination namespace: Destination default
  • Destination stream prefix: Mirror source name
  • Detect and propagate schema changes: Propagate field changes only

Provided that the connection is successful, Airbyte will begin incrementally pulling down documents from the S3 bucket and POSTing a message to the RabbitMQ queue as soon as changes are detected. Only new documents added to the S3 bucket will be processed, but data deleted from S3 will not be removed from the Bionic dataset associated with the Document Pipeline.

When the Bionic cron job receives a notification through RabbitMQ that there is new text extracted by Unstructured to download, the text files will flow through the document pipeline to the configured embedding model in Bionic to be converted and written into the Postgres vector database.

Embeddings may be processed locally using the default bge-small-en-v1.5 model running at the http://embeddings-api:80/openai endpoint, or using an external embedding provider such as thenlper/gte-large model on OctoAI at the https://text.octoai.run/v1 endpoint.

Different Team Datasets in BionicGPT can be configured to use different embedding models (and providers), depending on the cost, performance, and privacy requirements of the particular dataset. Likewise, different large language models (and providers) can be configured for each custom Prompt in BionicGPT based on the inference and reasoning abilities needed.

On the BionicGPT side, we set up two different Team Datasets, Document Pipelines, and Prompts  for a S3 document pipeline (S3 Pipeline), and a separate Azure Blob Storage (Azure Blob Pipeline) document pipeline. 

Each document pipeline must be associated with a corresponding dataset, to indicate which dataset documents flowing through that pipeline should be embedded to.

On the Airbyte side, two separate sources (S3 and Azure Blob Storage) and two destinations (RabbitMQ and RabbitMQ 2) are configured. 

The first RabbitMQ destination feeds into the routing_key “<api key>.bionic-pipeline” and the second RabbitMQ destination feeds into the routing_key “<api key 2>.bionic-pipeline” where api key and api key 2 are the API keys displayed in the Document Pipelines section of the BionicGPT dashboard.

User workflow for S3 (via Nextcloud) or Azure Blob Storage (via Storage Explorer)

Here our Nextcloud user uploads a document destined for embedding to the S3 Dataset in Bionic through the mounted External Storage.

Or alternatively, a user uploads a document destined for embedding to the Azure Blob Dataset in Bionic using Microsoft Azure Storage Explorer or any other compatible desktop client.

Once the hourly run of the Airbyte sync job completes for each pipeline, the uploaded document(s) will be embedded into the vector database for the matching dataset. You can manually execute a sync for a data pipeline in Airbyte by selecting the pipeline from the Connections list, switching to the Job History tab, and clicking the “Sync now” button.

A prompt must be created in BionicGPT whenever you wish to use a dataset with a particular LLM. This can be accomplished from the Prompts section of the Bionic dashboard. 

Finally, you will be able to select the prompts you have configured (RAG for the S3 Dataset and RAG 2 for the Azure Blob dataset) in the BionicGPT Chat Console and ask questions about the embedded documents using the configured LLMs and datasets. 

RAG Prompt with S3 Dataset and Llama 3 LLM
RAG 2 Prompt with Azure Blob Dataset and Llama 3 LLM
Airbyte, AWS, Azure, BionicGPT, Data Pipelines, Generative AI, NextCloud, Object Storage, Open Source AI, Retrieval Augmented Generation, Vector Databases
Previous Post
ELT Data Pipelines with Airbyte & BionicGPT for AI RAG

Leave a Reply

Your email address will not be published. Required fields are marked *

Fill out this field
Fill out this field
Please enter a valid email address.