Getting started

Collates process (aka. map and reduce)

In this first example we go trough the steps involved in defining a parralized pipeline that takes in a batch of document objects, runs the preprocessing in parallel, limits the size of the passages and finally collates the result into appropriate tensors to digest.

These steps of operations are very similar to a map/+reduce operation, just with a few extra map operations.

First we tokenize the batch of data in parallel.

import transformers
from warp_pipes.pipes import Parallel, Sequential, TokenizerPipe
from warp_pipes.pipes import HasPrefix

# example batch w. one object
batch = {
    "document.text": [
        """
        Fipple flutes are found in many cultures around the world.
        Often with six holes, the shepherd's pipe is a common pastoral image.
        Shepherds often piped both to soothe the sheep and to amuse themselves.
        Modern manufactured six-hole folk pipes are referred to as pennywhistle or 
        tin whistle. The recorder is a form of pipe, often used as a rudimentary 
        instructional musical instrument at schools, but versatile enough that 
        it is also used in orchestral music.
        """
    ],
    "title.text": ["Title: Pipe. "],
    "document.idx": [0],
}

# construct pipe which tokenizes the text and title attributes
tokenizer = transformers.AutoTokenizer.from_pretrained('bert-base-cased')
tokenizer_pipe = Parallel(
    Sequential(
        TokenizerPipe(
            tokenizer, key="text", field="document", 
            return_offsets_mapping=True, add_special_tokens=False, update=True),
        input_filter=HasPrefix("document")
    ),
    Sequential(
        TokenizerPipe(
            tokenizer, key="text", field="title", 
            return_offsets_mapping=True, add_special_tokens=False, update=True),
        input_filter=HasPrefix("document")
    ),
    update=True
)

# execute pipe by sending through the document batch
tokenized_batch = tokenizer_pipe(batch)
print(tokenized_batch)

Then we generate passages in accordance to some specified constraints.

from warp_pipes.pipes.passages import GeneratePassages

tokenizer =  ...
tokenized_batch = ...

# generate final passages by setting the size and stride for a preprocessed batch
passages_pipe = GeneratePassages(
    size=30,
    stride=20,
    field="document",
    global_keys=["idx"],
    start_tokens=[tokenizer.cls_token_id],
    end_tokens=[tokenizer.sep_token_id],
    prepend_field="title",
)
passages = passages_pipe(tokenized_batch, header="Passages")
print(passages)

Finally we collate the documents into tensors.

from warp_pipes.pipes import CollateField
from warp_pipes.support.functional import get_batch_eg

# some result of GeneratePassages pipe
passages = ...

# extract the document field, run pipe transformation into tensors
collate_docs = CollateField(
    field="document", 
    to_tensor=["idx", "input_ids", "attention_mask"]
)

# fetch only documents from passages, perform the collate process
egs = [get_batch_eg(passages, idx=i) for i  in [0, 1, 2, 3]]
c_batch = collate_docs(egs, to_tensor=[])
print(c_batch, "output batch")