map
docarray.utils.map
map_docs(docs, func, backend='thread', num_worker=None, pool=None, show_progress=False)
Return an iterator that applies func to every Document in docs in parallel,
yielding the results.
from docarray import DocList
from docarray.documents import ImageDoc
from docarray.utils.map import map_docs
def load_url_to_tensor(img: ImageDoc) -> ImageDoc:
img.tensor = img.url.load()
return img
url = (
'https://upload.wikimedia.org/wikipedia/commons/8/80/'
'Dag_Sebastian_Ahlander_at_G%C3%B6teborg_Book_Fair_2012b.jpg'
)
docs = DocList[ImageDoc]([ImageDoc(url=url) for _ in range(100)])
docs = DocList[ImageDoc](
list(map_docs(docs, load_url_to_tensor, backend='thread'))
) # threading is usually a good option for IO-bound tasks such as loading an
# ImageDoc from url
for doc in docs:
assert doc.tensor is not None
Parameters:
Returns:
| Type | Description |
|---|---|
Generator[T_doc, None, None]
|
yield Documents returned from |
Source code in docarray/utils/map.py
map_docs_batched(docs, func, batch_size, backend='thread', num_worker=None, shuffle=False, pool=None, show_progress=False)
Return an iterator that applies func to every minibatch of iterable in parallel,
yielding the results.
Each element in the returned iterator is an AnyDocArray.
from docarray import BaseDoc, DocList
from docarray.utils.map import map_docs_batched
class MyDoc(BaseDoc):
name: str
def upper_case_name(docs: DocList[MyDoc]) -> DocList[MyDoc]:
docs.name = [n.upper() for n in docs.name]
return docs
batch_size = 16
docs = DocList[MyDoc]([MyDoc(name='my orange cat') for _ in range(100)])
it = map_docs_batched(docs, upper_case_name, batch_size=batch_size)
for i, d in enumerate(it):
docs[i * batch_size : (i + 1) * batch_size] = d
assert len(docs) == 100
print(docs.name[:3])
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
docs |
T
|
DocList to apply function to |
required |
batch_size |
int
|
Size of each generated batch (except the last one, which might be smaller). |
required |
shuffle |
bool
|
If set, shuffle the Documents before dividing into minibatches. |
False
|
func |
Callable[[T], Union[T, T_doc]]
|
a function that takes an :class: |
required |
backend |
str
|
|
'thread'
|
num_worker |
Optional[int]
|
the number of parallel workers. If not given, then the number of CPUs in the system will be used. |
None
|
show_progress |
bool
|
show a progress bar |
False
|
pool |
Optional[Union[Pool, ThreadPool]]
|
use an existing/external pool. If given, |
None
|
Returns:
| Type | Description |
|---|---|
Generator[Union[T, T_doc], None, None]
|
yield DocLists returned from |
