Skip to main content

How to create a custom Document Loader

Overviewโ€‹

Applications based on LLMs frequently entail extracting data from databases or files, like PDFs, and converting it into a format that LLMs can utilize. In LangChain, this usually involves creating Document objects, which encapsulate the extracted text (page_content) along with metadataโ€”a dictionary containing details about the document, such as the author's name or the date of publication.

Document objects are often formatted into prompts that are fed into an LLM, allowing the LLM to use the information in the Document to generate a desired response (e.g., summarizing the document). Documents can be either used immediately or indexed into a vectorstore for future retrieval and use.

The main abstractions for Document Loading are:

ComponentDescription
DocumentContains text and metadata
BaseLoaderUse to convert raw data into Documents
BlobA representation of binary data that's located either in a file or in memory
BaseBlobParserLogic to parse a Blob to yield Document objects

This guide will demonstrate how to write custom document loading and file parsing logic; specifically, we'll see how to:

  1. Create a standard document Loader by sub-classing from BaseLoader.
  2. Create a parser using BaseBlobParser and use it in conjunction with Blob and BlobLoaders. This is useful primarily when working with files.

Standard Document Loaderโ€‹

A document loader can be implemented by sub-classing from a BaseLoader which provides a standard interface for loading documents.

Interfaceโ€‹

Method NameExplanation
lazy_loadUsed to load documents one by one lazily. Use for production code.
alazy_loadAsync variant of lazy_load
loadUsed to load all the documents into memory eagerly. Use for prototyping or interactive work.
aloadUsed to load all the documents into memory eagerly. Use for prototyping or interactive work. Added in 2024-04 to LangChain.
  • The load methods is a convenience method meant solely for prototyping work -- it just invokes list(self.lazy_load()).
  • The alazy_load has a default implementation that will delegate to lazy_load. If you're using async, we recommend overriding the default implementation and providing a native async implementation.
important

When implementing a document loader do NOT provide parameters via the lazy_load or alazy_load methods.

All configuration is expected to be passed through the initializer (init). This was a design choice made by LangChain to make sure that once a document loader has been instantiated it has all the information needed to load documents.

Installationโ€‹

Install langchain-core and langchain_community.

%pip install -qqU langchain_core langchain_community

Implementationโ€‹

Let's create an example of a standard document loader that loads a file and creates a document from each line in the file.

from typing import AsyncIterator, Iterator

from langchain_core.document_loaders import BaseLoader
from langchain_core.documents import Document


class CustomDocumentLoader(BaseLoader):
"""An example document loader that reads a file line by line."""

def __init__(self, file_path: str) -> None:
"""Initialize the loader with a file path.

Args:
file_path: The path to the file to load.
"""
self.file_path = file_path

def lazy_load(self) -> Iterator[Document]: # <-- Does not take any arguments
"""A lazy loader that reads a file line by line.

When you're implementing lazy load methods, you should use a generator
to yield documents one by one.
"""
with open(self.file_path, encoding="utf-8") as f:
line_number = 0
for line in f:
yield Document(
page_content=line,
metadata={"line_number": line_number, "source": self.file_path},
)
line_number += 1

# alazy_load is OPTIONAL.
# If you leave out the implementation, a default implementation which delegates to lazy_load will be used!
async def alazy_load(
self,
) -> AsyncIterator[Document]: # <-- Does not take any arguments
"""An async lazy loader that reads a file line by line."""
# Requires aiofiles
# Install with `pip install aiofiles`
# https://github.com/Tinche/aiofiles
import aiofiles

async with aiofiles.open(self.file_path, encoding="utf-8") as f:
line_number = 0
async for line in f:
yield Document(
page_content=line,
metadata={"line_number": line_number, "source": self.file_path},
)
line_number += 1
API Reference:BaseLoader | Document

Test ๐Ÿงชโ€‹

To test out the document loader, we need a file with some quality content.

with open("./meow.txt", "w", encoding="utf-8") as f:
quality_content = "meow meow๐Ÿฑ \n meow meow๐Ÿฑ \n meow๐Ÿ˜ป๐Ÿ˜ป"
f.write(quality_content)

loader = CustomDocumentLoader("./meow.txt")
%pip install -q aiofiles
## Test out the lazy load interface
for doc in loader.lazy_load():
print()
print(type(doc))
print(doc)

<class 'langchain_core.documents.base.Document'>
page_content='meow meow๐Ÿฑ
' metadata={'line_number': 0, 'source': './meow.txt'}

<class 'langchain_core.documents.base.Document'>
page_content=' meow meow๐Ÿฑ
' metadata={'line_number': 1, 'source': './meow.txt'}

<class 'langchain_core.documents.base.Document'>
page_content=' meow๐Ÿ˜ป๐Ÿ˜ป' metadata={'line_number': 2, 'source': './meow.txt'}
## Test out the async implementation
async for doc in loader.alazy_load():
print()
print(type(doc))
print(doc)

<class 'langchain_core.documents.base.Document'>
page_content='meow meow๐Ÿฑ
' metadata={'line_number': 0, 'source': './meow.txt'}

<class 'langchain_core.documents.base.Document'>
page_content=' meow meow๐Ÿฑ
' metadata={'line_number': 1, 'source': './meow.txt'}

<class 'langchain_core.documents.base.Document'>
page_content=' meow๐Ÿ˜ป๐Ÿ˜ป' metadata={'line_number': 2, 'source': './meow.txt'}
tip

load() can be helpful in an interactive environment such as a jupyter notebook.

Avoid using it for production code since eager loading assumes that all the content can fit into memory, which is not always the case, especially for enterprise data.

loader.load()
[Document(metadata={'line_number': 0, 'source': './meow.txt'}, page_content='meow meow๐Ÿฑ \n'),
Document(metadata={'line_number': 1, 'source': './meow.txt'}, page_content=' meow meow๐Ÿฑ \n'),
Document(metadata={'line_number': 2, 'source': './meow.txt'}, page_content=' meow๐Ÿ˜ป๐Ÿ˜ป')]

Working with Filesโ€‹

Many document loaders involve parsing files. The difference between such loaders usually stems from how the file is parsed, rather than how the file is loaded. For example, you can use open to read the binary content of either a PDF or a markdown file, but you need different parsing logic to convert that binary data into text.

As a result, it can be helpful to decouple the parsing logic from the loading logic, which makes it easier to re-use a given parser regardless of how the data was loaded.

BaseBlobParserโ€‹

A BaseBlobParser is an interface that accepts a blob and outputs a list of Document objects. A blob is a representation of data that lives either in memory or in a file. LangChain python has a Blob primitive which is inspired by the Blob WebAPI spec.

from langchain_core.document_loaders import BaseBlobParser, Blob


class MyParser(BaseBlobParser):
"""A simple parser that creates a document from each line."""

def lazy_parse(self, blob: Blob) -> Iterator[Document]:
"""Parse a blob into a document line by line."""
line_number = 0
with blob.as_bytes_io() as f:
for line in f:
line_number += 1
yield Document(
page_content=line,
metadata={"line_number": line_number, "source": blob.source},
)
API Reference:BaseBlobParser | Blob
blob = Blob.from_path("./meow.txt")
parser = MyParser()
list(parser.lazy_parse(blob))
[Document(metadata={'line_number': 1, 'source': './meow.txt'}, page_content='meow meow๐Ÿฑ \n'),
Document(metadata={'line_number': 2, 'source': './meow.txt'}, page_content=' meow meow๐Ÿฑ \n'),
Document(metadata={'line_number': 3, 'source': './meow.txt'}, page_content=' meow๐Ÿ˜ป๐Ÿ˜ป')]

Using the blob API also allows one to load content directly from memory without having to read it from a file!

blob = Blob(data=b"some data from memory\nmeow")
list(parser.lazy_parse(blob))
[Document(metadata={'line_number': 1, 'source': None}, page_content='some data from memory\n'),
Document(metadata={'line_number': 2, 'source': None}, page_content='meow')]

Blobโ€‹

Let's take a quick look through some of the Blob API.

blob = Blob.from_path("./meow.txt", metadata={"foo": "bar"})
blob.encoding
'utf-8'
blob.as_bytes()
b'meow meow\xf0\x9f\x90\xb1 \n meow meow\xf0\x9f\x90\xb1 \n meow\xf0\x9f\x98\xbb\xf0\x9f\x98\xbb'
blob.as_string()
'meow meow๐Ÿฑ \n meow meow๐Ÿฑ \n meow๐Ÿ˜ป๐Ÿ˜ป'
blob.as_bytes_io()
<contextlib._GeneratorContextManager at 0x7f89cf9336d0>
blob.metadata
{'foo': 'bar'}
blob.source
'./meow.txt'

Blob Loadersโ€‹

While a parser encapsulates the logic needed to parse binary data into documents, blob loaders encapsulate the logic that's necessary to load blobs from a given storage location.

A the moment, LangChain supports FileSystemBlobLoader and CloudBlobLoader.

You can use the FileSystemBlobLoader to load blobs and then use the parser to parse them.

from langchain_community.document_loaders.blob_loaders import FileSystemBlobLoader

filesystem_blob_loader = FileSystemBlobLoader(
path=".", glob="*.mdx", show_progress=True
)
API Reference:FileSystemBlobLoader
%pip install -q tqdm
parser = MyParser()
for blob in filesystem_blob_loader.yield_blobs():
for doc in parser.lazy_parse(blob):
print(doc)
break
0it [00:00, ?it/s]

Or, you can use CloudBlobLoader to load blobs from a cloud storage location (Supports s3://, az://, gs://, file:// schemes).

%pip install -q cloudpathlib[s3]
from langchain_community.document_loaders.blob_loaders import CloudBlobLoader

cloud_blob_loader = CloudBlobLoader(
url="https://ee-files.s3.amazonaws.com/files/106410/download/821953/7-http-s3.amazonaws.comrosco.pdf",
glob="*.pdf",
show_progress=True,
)
for blob in cloud_blob_loader.yield_blobs():
print(blob)
API Reference:CloudBlobLoader
---------------------------------------------------------------------------
``````output
NoCredentialsError Traceback (most recent call last)
``````output
Cell In[21], line 4
1 from langchain_community.document_loaders.blob_loaders import CloudBlobLoader
3 blob_loader = CloudBlobLoader(url="s3://my_bucket", glob="*.mdx", show_progress=True)
----> 4 for blob in blob_loader.yield_blobs():
5 print(blob)
``````output
File ~/workspace.bda/langchain/libs/community/langchain_community/document_loaders/blob_loaders/cloud_blob_loader.py:217, in CloudBlobLoader.yield_blobs(self)
212 """Yield blobs that match the requested pattern."""
213 iterator = _make_iterator(
214 length_func=self.count_matching_files, show_progress=self.show_progress
215 )
--> 217 for path in iterator(self._yield_paths()):
218 # yield Blob.from_path(path)
219 yield self.from_path(path)
``````output
File ~/workspace.bda/langchain/libs/community/langchain_community/document_loaders/blob_loaders/cloud_blob_loader.py:115, in _make_iterator.<locals>._with_tqdm(iterable)
113 def _with_tqdm(iterable: Iterable[T]) -> Iterator[T]:
114 """Wrap an iterable in a tqdm progress bar."""
--> 115 return tqdm(iterable, total=length_func())
``````output
File ~/workspace.bda/langchain/libs/community/langchain_community/document_loaders/blob_loaders/cloud_blob_loader.py:242, in CloudBlobLoader.count_matching_files(self)
239 # Carry out a full iteration to count the files without
240 # materializing anything expensive in memory.
241 num = 0
--> 242 for _ in self._yield_paths():
243 num += 1
244 return num
``````output
File ~/workspace.bda/langchain/libs/community/langchain_community/document_loaders/blob_loaders/cloud_blob_loader.py:228, in CloudBlobLoader._yield_paths(self)
225 return
227 paths = self.path.glob(self.glob)
--> 228 for path in paths:
229 if self.exclude:
230 if any(path.match(glob) for glob in self.exclude):
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/cloudpathlib/cloudpath.py:500, in CloudPath.glob(self, pattern, case_sensitive)
495 pattern_parts = PurePosixPath(pattern).parts
496 selector = _make_selector(
497 tuple(pattern_parts), _posix_flavour, case_sensitive=case_sensitive
498 )
--> 500 yield from self._glob(
501 selector,
502 "/" in pattern
503 or "**"
504 in pattern, # recursive listing needed if explicit ** or any sub folder in pattern
505 )
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/cloudpathlib/cloudpath.py:478, in CloudPath._glob(self, selector, recursive)
477 def _glob(self, selector, recursive: bool) -> Generator[Self, None, None]:
--> 478 file_tree = self._build_subtree(recursive)
480 root = _CloudPathSelectable(
481 self.name,
482 [], # nothing above self will be returned, so initial parents is empty
483 file_tree,
484 )
486 for p in selector.select_from(root):
487 # select_from returns self.name/... so strip before joining
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/cloudpathlib/cloudpath.py:465, in CloudPath._build_subtree(self, recursive)
461 _build_tree(trunk[branch], next_branch, nodes, is_dir)
463 file_tree = Tree()
--> 465 for f, is_dir in self.client._list_dir(self, recursive=recursive):
466 parts = str(f.relative_to(self)).split("/")
468 # skip self
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/cloudpathlib/s3/s3client.py:233, in S3Client._list_dir(self, cloud_path, recursive)
229 yielded_dirs = set()
231 paginator = self.client.get_paginator("list_objects_v2")
--> 233 for result in paginator.paginate(
234 Bucket=cloud_path.bucket,
235 Prefix=prefix,
236 Delimiter=("" if recursive else "/"),
237 **self.boto3_list_extra_args,
238 ):
239 # yield everything in common prefixes as directories
240 for result_prefix in result.get("CommonPrefixes", []):
241 canonical = result_prefix.get("Prefix").rstrip("/") # keep a canonical form
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/paginate.py:269, in PageIterator.__iter__(self)
267 self._inject_starting_params(current_kwargs)
268 while True:
--> 269 response = self._make_request(current_kwargs)
270 parsed = self._extract_parsed_response(response)
271 if first_request:
272 # The first request is handled differently. We could
273 # possibly have a resume/starting token that tells us where
274 # to index into the retrieved page.
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/paginate.py:357, in PageIterator._make_request(self, current_kwargs)
356 def _make_request(self, current_kwargs):
--> 357 return self._method(**current_kwargs)
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/client.py:569, in ClientCreator._create_api_method.<locals>._api_call(self, *args, **kwargs)
565 raise TypeError(
566 f"{py_operation_name}() only accepts keyword arguments."
567 )
568 # The "self" in this scope is referring to the BaseClient.
--> 569 return self._make_api_call(operation_name, kwargs)
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/client.py:1005, in BaseClient._make_api_call(self, operation_name, api_params)
1001 maybe_compress_request(
1002 self.meta.config, request_dict, operation_model
1003 )
1004 apply_request_checksum(request_dict)
-> 1005 http, parsed_response = self._make_request(
1006 operation_model, request_dict, request_context
1007 )
1009 self.meta.events.emit(
1010 f'after-call.{service_id}.{operation_name}',
1011 http_response=http,
(...)
1014 context=request_context,
1015 )
1017 if http.status_code >= 300:
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/client.py:1029, in BaseClient._make_request(self, operation_model, request_dict, request_context)
1027 def _make_request(self, operation_model, request_dict, request_context):
1028 try:
-> 1029 return self._endpoint.make_request(operation_model, request_dict)
1030 except Exception as e:
1031 self.meta.events.emit(
1032 f'after-call-error.{self._service_model.service_id.hyphenize()}.{operation_model.name}',
1033 exception=e,
1034 context=request_context,
1035 )
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/endpoint.py:119, in Endpoint.make_request(self, operation_model, request_dict)
113 def make_request(self, operation_model, request_dict):
114 logger.debug(
115 "Making request for %s with params: %s",
116 operation_model,
117 request_dict,
118 )
--> 119 return self._send_request(request_dict, operation_model)
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/endpoint.py:196, in Endpoint._send_request(self, request_dict, operation_model)
194 context = request_dict['context']
195 self._update_retries_context(context, attempts)
--> 196 request = self.create_request(request_dict, operation_model)
197 success_response, exception = self._get_response(
198 request, operation_model, context
199 )
200 while self._needs_retry(
201 attempts,
202 operation_model,
(...)
205 exception,
206 ):
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/endpoint.py:132, in Endpoint.create_request(self, params, operation_model)
130 service_id = operation_model.service_model.service_id.hyphenize()
131 event_name = f'request-created.{service_id}.{operation_model.name}'
--> 132 self._event_emitter.emit(
133 event_name,
134 request=request,
135 operation_name=operation_model.name,
136 )
137 prepared_request = self.prepare_request(request)
138 return prepared_request
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/hooks.py:412, in EventAliaser.emit(self, event_name, **kwargs)
410 def emit(self, event_name, **kwargs):
411 aliased_event_name = self._alias_event_name(event_name)
--> 412 return self._emitter.emit(aliased_event_name, **kwargs)
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/hooks.py:256, in HierarchicalEmitter.emit(self, event_name, **kwargs)
245 def emit(self, event_name, **kwargs):
246 """
247 Emit an event by name with arguments passed as keyword args.
248
(...)
254 handlers.
255 """
--> 256 return self._emit(event_name, kwargs)
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/hooks.py:239, in HierarchicalEmitter._emit(self, event_name, kwargs, stop_on_response)
237 for handler in handlers_to_call:
238 logger.debug('Event %s: calling handler %s', event_name, handler)
--> 239 response = handler(**kwargs)
240 responses.append((handler, response))
241 if stop_on_response and response is not None:
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/signers.py:105, in RequestSigner.handler(self, operation_name, request, **kwargs)
100 def handler(self, operation_name=None, request=None, **kwargs):
101 # This is typically hooked up to the "request-created" event
102 # from a client's event emitter. When a new request is created
103 # this method is invoked to sign the request.
104 # Don't call this method directly.
--> 105 return self.sign(operation_name, request)
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/signers.py:197, in RequestSigner.sign(self, operation_name, request, region_name, signing_type, expires_in, signing_name)
194 else:
195 raise e
--> 197 auth.add_auth(request)
``````output
File ~/workspace.bda/langchain/libs/community/.venv/lib/python3.11/site-packages/botocore/auth.py:423, in SigV4Auth.add_auth(self, request)
421 def add_auth(self, request):
422 if self.credentials is None:
--> 423 raise NoCredentialsError()
424 datetime_now = datetime.datetime.utcnow()
425 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
``````output
NoCredentialsError: Unable to locate credentials

Generic Loaderโ€‹

LangChain has a GenericLoader abstraction which composes a BlobLoader with a BaseBlobParser.

GenericLoader is meant to provide standardized classmethods that make it easy to use existing BlobLoader implementations. At the moment, the FileSystemBlobLoader and CloudBlobLoader are supported.

from langchain_community.document_loaders.generic import GenericLoader

generic_loader_filesystem = GenericLoader(
blob_loader=filesystem_blob_loader, blob_parser=parser
)
for idx, doc in enumerate(loader.lazy_load()):
if idx < 5:
print(doc)

print("... output truncated for demo purposes")
API Reference:GenericLoader
page_content='meow meow๐Ÿฑ 
' metadata={'line_number': 0, 'source': './meow.txt'}
page_content=' meow meow๐Ÿฑ
' metadata={'line_number': 1, 'source': './meow.txt'}
page_content=' meow๐Ÿ˜ป๐Ÿ˜ป' metadata={'line_number': 2, 'source': './meow.txt'}
... output truncated for demo purposes
from langchain_community.document_loaders.generic import GenericLoader

generic_loader_cloud = GenericLoader(blob_loader=cloud_blob_loader, blob_parser=parser)
for idx, doc in enumerate(loader.lazy_load()):
if idx < 5:
print(doc)

print("... output truncated for demo purposes")
API Reference:GenericLoader

Custom Generic Loaderโ€‹

If you really like creating classes, you can sub-class and create a class to encapsulate the logic together.

You can sub-class from this class to load content using an existing loader.

from typing import Any


class MyCustomLoader(GenericLoader):
@staticmethod
def get_parser(**kwargs: Any) -> BaseBlobParser:
"""Override this method to associate a default parser with the class."""
return MyParser()
loader = MyCustomLoader.from_filesystem(path=".", glob="*.mdx", show_progress=True)

for idx, doc in enumerate(loader.lazy_load()):
if idx < 5:
print(doc)

print("... output truncated for demo purposes")
0it [00:00, ?it/s]
... output truncated for demo purposes

Was this page helpful?