注意
点击 此处 下载完整的示例代码
使用 Torchtext 预处理自定义文本数据集¶
**作者**:Anupam Sharma
本教程说明了在非内置数据集上使用 torchtext 的方法。在本教程中,我们将预处理一个数据集,该数据集可进一步用于训练用于机器翻译的序列到序列模型(例如,在本教程中:使用神经网络进行序列到序列学习),但无需使用旧版本的 torchtext。
在本教程中,我们将学习如何
读取数据集
分词句子
对句子应用转换
执行桶批处理
假设我们需要准备一个数据集来训练一个可以执行英语到德语翻译的模型。我们将使用由 Tatoeba 项目 提供的制表符分隔的德语-英语句子对,可以从 此链接 下载。
其他语言的句子对可以在 此链接 中找到。
设置¶
首先,下载数据集,解压缩 zip 文件,并记下文件 deu.txt 的路径。
确保安装了以下软件包
这里,我们使用 Spacy 对文本进行分词。简单来说,分词就是将句子转换为单词列表。Spacy 是一个用于各种自然语言处理 (NLP) 任务的 Python 包。
从 Spacy 下载英文和德文模型,如下所示
python -m spacy download en_core_web_sm
python -m spacy download de_core_news_sm
让我们从导入所需的模块开始
import torchdata.datapipes as dp
import torchtext.transforms as T
import spacy
from torchtext.vocab import build_vocab_from_iterator
eng = spacy.load("en_core_web_sm") # Load the English model to tokenize English text
de = spacy.load("de_core_news_sm") # Load the German model to tokenize German text
现在我们将加载数据集
FILE_PATH = 'data/deu.txt'
data_pipe = dp.iter.IterableWrapper([FILE_PATH])
data_pipe = dp.iter.FileOpener(data_pipe, mode='rb')
data_pipe = data_pipe.parse_csv(skip_lines=0, delimiter='\t', as_tuple=True)
在上面的代码块中,我们正在执行以下操作
在第 2 行,我们创建了一个文件名迭代器
在第 3 行,我们将迭代器传递给 FileOpener,然后以读取模式打开文件
在第 4 行,我们调用一个函数来解析文件,该函数再次返回一个元组迭代器,表示制表符分隔文件的每一行
DataPipes 可以被认为类似于数据集对象,我们可以在其上执行各种操作。查看 此教程 以了解有关 DataPipes 的更多详细信息。
我们可以验证迭代器是否具有句子对,如下所示
for sample in data_pipe:
print(sample)
break
请注意,我们还有属性详细信息以及句子对。我们将编写一个小函数来删除属性详细信息
def removeAttribution(row):
"""
Function to keep the first two elements in a tuple
"""
return row[:2]
data_pipe = data_pipe.map(removeAttribution)
上面代码块中第 6 行的 map 函数可用于对 data_pipe 的每个元素应用某些函数。现在,我们可以验证 data_pipe 只包含句子对。
for sample in data_pipe:
print(sample)
break
现在,让我们定义一些执行分词的函数
def engTokenize(text):
"""
Tokenize an English text and return a list of tokens
"""
return [token.text for token in eng.tokenizer(text)]
def deTokenize(text):
"""
Tokenize a German text and return a list of tokens
"""
return [token.text for token in de.tokenizer(text)]
上述函数接受文本并返回一个单词列表,如下所示
print(engTokenize("Have a good day!!!"))
print(deTokenize("Haben Sie einen guten Tag!!!"))
构建词汇表¶
让我们将英文句子作为源句子,德文句子作为目标句子。
词汇表可以被认为是我们数据集中唯一单词的集合。我们现在将为我们的源和目标构建词汇表。
让我们定义一个函数,从迭代器中元组的元素中获取标记。
def getTokens(data_iter, place):
"""
Function to yield tokens from an iterator. Since, our iterator contains
tuple of sentences (source and target), `place` parameters defines for which
index to return the tokens for. `place=0` for source and `place=1` for target
"""
for english, german in data_iter:
if place == 0:
yield engTokenize(english)
else:
yield deTokenize(german)
现在,我们将为源构建词汇表
source_vocab = build_vocab_from_iterator(
getTokens(data_pipe,0),
min_freq=2,
specials= ['<pad>', '<sos>', '<eos>', '<unk>'],
special_first=True
)
source_vocab.set_default_index(source_vocab['<unk>'])
上面的代码从迭代器构建词汇表。在上面的代码块中
在第 2 行,我们使用 place=0 调用 getTokens() 函数,因为我们需要源句子的词汇表。
在第 3 行,我们设置 min_freq=2。这意味着,该函数将跳过出现次数少于 2 次的单词。
在第 4 行,我们指定了一些特殊标记
<sos> 用于句子的开头
<eos> 用于句子的结尾
<unk> 用于未知单词。未知单词的一个例子是由于 min_freq=2 而被跳过的单词。
<pad> 是填充标记。在训练过程中,模型大多以批次进行训练。在一个批次中,可以有不同长度的句子。因此,我们用 <pad> 标记填充较短的句子,以使批次中所有序列的长度相等。
在第 5 行,我们设置 special_first=True。这意味着 <pad> 将获得索引 0,<sos> 索引 1,<eos> 索引 2,而 <unk> 将获得词汇表中的索引 3。
在第 7 行,我们将默认索引设置为 <unk> 的索引。这意味着如果某个单词不在词汇表中,我们将使用 <unk> 来代替该未知单词。
类似地,我们将为目标句子构建词汇表
target_vocab = build_vocab_from_iterator(
getTokens(data_pipe,1),
min_freq=2,
specials= ['<pad>', '<sos>', '<eos>', '<unk>'],
special_first=True
)
target_vocab.set_default_index(target_vocab['<unk>'])
请注意,以上示例显示了如何将特殊标记添加到我们的词汇表中。特殊标记可能会根据需求而改变。
现在,我们可以验证特殊标记是否放置在开头,然后是其他单词。在下面的代码中,source_vocab.get_itos() 返回一个列表,其中包含基于词汇表的索引处的标记。
print(source_vocab.get_itos()[:9])
使用词汇表将句子转换为数字¶
构建词汇表后,我们需要将我们的句子转换为相应的索引。让我们为此定义一些函数
def getTransform(vocab):
"""
Create transforms based on given vocabulary. The returned transform is applied to sequence
of tokens.
"""
text_tranform = T.Sequential(
## converts the sentences to indices based on given vocabulary
T.VocabTransform(vocab=vocab),
## Add <sos> at beginning of each sentence. 1 because the index for <sos> in vocabulary is
# 1 as seen in previous section
T.AddToken(1, begin=True),
## Add <eos> at beginning of each sentence. 2 because the index for <eos> in vocabulary is
# 2 as seen in previous section
T.AddToken(2, begin=False)
)
return text_tranform
现在,让我们看看如何使用上述函数。该函数返回一个 Transforms 对象,我们将在我们的句子上使用它。让我们取一个随机句子并检查转换是如何工作的。
temp_list = list(data_pipe)
some_sentence = temp_list[798][0]
print("Some sentence=", end="")
print(some_sentence)
transformed_sentence = getTransform(source_vocab)(engTokenize(some_sentence))
print("Transformed sentence=", end="")
print(transformed_sentence)
index_to_string = source_vocab.get_itos()
for index in transformed_sentence:
print(index_to_string[index], end=" ")
在上面的代码中,
在第 2 行,我们从第 1 行在 data_pipe 中创建的列表中获取一个源句子
在第 5 行,我们根据源词汇表获取一个转换并将其应用于分词后的句子。请注意,转换接受单词列表而不是句子。
在第 8 行,我们获得索引到字符串的映射,然后使用它获取转换后的句子
现在,我们将使用 DataPipe 函数将转换应用于我们所有的句子。让我们为此定义更多函数。
def applyTransform(sequence_pair):
"""
Apply transforms to sequence of tokens in a sequence pair
"""
return (
getTransform(source_vocab)(engTokenize(sequence_pair[0])),
getTransform(target_vocab)(deTokenize(sequence_pair[1]))
)
data_pipe = data_pipe.map(applyTransform) ## Apply the function to each element in the iterator
temp_list = list(data_pipe)
print(temp_list[0])
创建批次(使用桶批次)¶
通常,我们以批次训练模型。在处理序列到序列模型时,建议使批次中序列的长度相似。为此,我们将使用 data_pipe 的 bucketbatch 函数。
让我们定义一些将由 bucketbatch 函数使用的函数。
def sortBucket(bucket):
"""
Function to sort a given bucket. Here, we want to sort based on the length of
source and target sequence.
"""
return sorted(bucket, key=lambda x: (len(x[0]), len(x[1])))
现在,我们将应用 bucketbatch 函数
data_pipe = data_pipe.bucketbatch(
batch_size = 4, batch_num=5, bucket_num=1,
use_in_batch_shuffle=False, sort_key=sortBucket
)
在上面的代码块中
我们将批次大小设置为 4。
batch_num 是要保存在桶中的批次数量
bucket_num 是要保存在池中用于洗牌的桶的数量
sort_key 指定一个函数,该函数接受一个桶并对其进行排序
现在,让我们将源句子的批次视为 X,将目标句子的批次视为 y。通常,在训练模型时,我们对 X 的批次进行预测,并将结果与 y 进行比较。但是,我们 data_pipe 中的批次形式为 [(X_1,y_1), (X_2,y_2), (X_3,y_3), (X_4,y_4)]
print(list(data_pipe)[0])
因此,我们现在将它们转换为以下形式:((X_1,X_2,X_3,X_4), (y_1,y_2,y_3,y_4))。为此,我们将编写一个小函数
def separateSourceTarget(sequence_pairs):
"""
input of form: `[(X_1,y_1), (X_2,y_2), (X_3,y_3), (X_4,y_4)]`
output of form: `((X_1,X_2,X_3,X_4), (y_1,y_2,y_3,y_4))`
"""
sources,targets = zip(*sequence_pairs)
return sources,targets
## Apply the function to each element in the iterator
data_pipe = data_pipe.map(separateSourceTarget)
print(list(data_pipe)[0])
现在,我们拥有了所需的数据。
填充¶
如前面在构建词汇表时所讨论的,我们需要填充批次中较短的句子,以使批次中所有序列的长度相等。我们可以按如下方式执行填充
def applyPadding(pair_of_sequences):
"""
Convert sequences to tensors and apply padding
"""
return (T.ToTensor(0)(list(pair_of_sequences[0])), T.ToTensor(0)(list(pair_of_sequences[1])))
## `T.ToTensor(0)` returns a transform that converts the sequence to `torch.tensor` and also applies
# padding. Here, `0` is passed to the constructor to specify the index of the `<pad>` token in the
# vocabulary.
data_pipe = data_pipe.map(applyPadding)
现在,我们可以使用索引到字符串的映射来查看序列在使用标记而不是索引时的外观
source_index_to_string = source_vocab.get_itos()
target_index_to_string = target_vocab.get_itos()
def showSomeTransformedSentences(data_pipe):
"""
Function to show how the sentences look like after applying all transforms.
Here we try to print actual words instead of corresponding index
"""
for sources,targets in data_pipe:
if sources[0][-1] != 0:
continue # Just to visualize padding of shorter sentences
for i in range(4):
source = ""
for token in sources[i]:
source += " " + source_index_to_string[token]
target = ""
for token in targets[i]:
target += " " + target_index_to_string[token]
print(f"Source: {source}")
print(f"Traget: {target}")
break
showSomeTransformedSentences(data_pipe)
在上面的输出中,我们可以观察到较短的句子已用 <pad> 填充。现在,我们可以在编写训练函数时使用 data_pipe。
本教程的某些部分借鉴了 这篇文章。
脚本的总运行时间:(0 分钟 0.000 秒)