reference: https://github.com/karpathy/minbpe
base.py
"""
contains the base Tokenizer class and a few common helper functions.
THe base class also contains the (common) save/load functionality.
It would be possible to be a lot more strict about the interface and
e.g. isolating all regex/pattern parts to the RegexTokenizer, but
some concessions are made for simplicity.
"""
import unicodedata
# a few helper funcitonss useful for both BasicTokenizer and RegexTokenzier
def get_stats(ids, counts=None):
"""
Given a list of integers, return a dictionary of counts of consecutive paris
Example: [1, 2, 3, 1, 2] -> {(1,2):2, (2,3):1, (3,1):1}
Optionally allows to update an existing dictionary of counts
"""
counts = {} if counts is None else counts
for pair in zip(ids, ids[1:]): # iterate consecutive elements
counts[pair] = counts.get(pair, 0) + 1
return counts
def merge(ids, pair, idx):
"""
In the list of integers (ids), replace all consecutive occurrences
of pair with the new integer token idx
Example: ids = [1, 2, 3, 1, 2], pair=(1,2), idx=4 -> [4, 3, 4]
"""
newids = []
i = 0
while i < len(ids):
# if not at the very last position AND the pair matches, replace it
if ids[i] == pair[0] and i < len(ids)-1 and ids[i+1] == pair[1]:
newids.append(idx)
i += 2
else:
newids.append(ids[i])
return newids
# first two helper functions...
def replace_control_characters(s: str) -> str:
# we don't want to print control characters
# which distort the output (e.g. \n or much worse)
# https://stackoverflow.com/questions/4324790/removing-control-characters-from-a-string-in-python/19016117#19016117
# http://www.unicode.org/reports/tr44/#GC_Values_Table
chars = []
for ch in s:
if unicodedata.category(ch)[0] != "C":
chars.append(ch) # this character is OK
else:
chars.append(f"\\u{ord(ch):04x}") # escape
return "".join(chars)
def render_token(t: bytes) -> str:
# pretty print a token, escaping control characters
s = t.decode('utf-8', errors='replace')
s = replace_control_characters(s)
return s
# the base Tokenizer class
class Tokenizer:
"""
Base class for Tokenizers
"""
def __init__(self):
# default: vocab size of 256 (all bytes), no merges, no patterns
self.merges = {} #(int, int) -> int
self.pattern = "" # str
self.special_tokens = {} # str -> int, e.g. {'<|endoftext|>': 100257}
self.vocab = self._build_vocab() # int -> bytes
def train(self, text, vocab_size, verbose=False):
# Tokenizer can train a vocabulary of size vocab_size from text
raise NotImplementedError
def encode(self, text):
# Tokenizer can encode a string into a list of integers
raise NotImplementedError
def decode(self, ids):
# Tokenizer decode a list of integers into a string
raise NotImplementedError
def _build_vocab(self):
# vocab is simply and deterministically derived from merges
vocab = {idx: bytes([idx]) for idx in range(256)}
for (p0,p1), idx in self.merges.items():
vocab[idx] = vocab[p0] + vocab[p1]
for special, idx in self.special_tokens.items():
vocab[idx] = special.encode("utf-8")
return vocab
def save(self, file_prefix):
"""
saves two files: file_prefix.vocab and file_prefix.model
This is inspired (but not equivalent to!) sentencepieces's model saving:
- model file is the critical one, intended for load()
- vocab file is just a pretty printed version for human inspection only
"""
# write the model: to be used in load() later
model_file = file_prefix + ".model"
with open(model_file, 'w') as f:
# write the version, pattern and merges, that's all that's needed
f.write("minbpe v1\n")
f.write(f"{self.pattern}\n")
# write the special tokens, first the number of them, then each one
f.write(f"{len(self.special_tokens)}\n")
for special, idx in self.special_tokens.items():
f.write(f"{special} {idx}\n")
# the merges dict
for idx1, idx2 in self.merges:
f.write(f"{idx1} {idx2}\n")
# write the vocab: for the human to look at
vocab_file = file_prefix + ".vocab"
inverted_merges = {idx: pair for pair, idx in self.merges.items()}
with open(vocab_file, "w", encoding="utf-8") as f:
for idx, token in self.vocab.items():
# note: many tokens may be partial utf-8 sequences
# and cannot be decoded into valid strings. Here we're using
# errors='replace' to replace them with the replacement char �.
# this also means that we couldn't possibly use .vocab in load()
# because decoding in this way is a lossy operation!
s = render_token(token)
# find the children of this token, if any
if idx in inverted_merges:
# if this token has children, render it nicely as a merge
idx0, idx1 = inverted_merges[idx]
s0 = render_token(self.vocab[idx0])
s1 = render_token(self.vocab[idx1])
f.write(f"[{s0}][{s1}] -> [{s}] {idx}\n")
else:
# otherwise this is leaf token, just print it
# (this should just be the first 256 tokens, the bytes)
f.write(f"[{s}] {idx}\n")
def load(self, model_file):
"""
Inverse of save() but only for the model file
"""
assert model_file.endswith(".model")
# read the model file
merges = {}
special_tokens = {}
idx = 256
with open(model_file, 'r', encoding="utf-8") as f:
# read the version
version = f.readline().strip()
assert version == "minbpe v1"
# read the pattern
self.pattern = f.readline().strip()
# read the specical tokens
num_special = int(f.readline().strip())
for _ in range(num_special):
special, special_idx = f.readline().strip().split()
special_tokens[special] = int(special_idx)
# read the merges
for line in f:
idx1, idx2 = map(int, line.split())
merges[(idx1, idx2)] = idx
idx += 1
self.merges = merges
self.special_tokens = special_tokens
self.vocab =self._build_vocab()
basic.py
"""
Minimal (byte-level) Byte Pair Encoding tokenizer.
Algorithmically follows along the GPT tokenizer:
https://github.com/openai/gpt-2/blob/master/src/encoder.py
But:
- Does not handle the regular expression splitting pattern.
- Does not handle any special tokens.
"""
from .base import Tokenizer, get_stats, merge
class BasicTokenizer(Tokenizer):
def __init__(self):
super().__init__()
def train(self, text, vocab_size, verbose=False):
assert vocab_size >= 256
num_merges = vocab_size - 256
# input text processing
text_bytes = text.encode("utf-8") # raw bytes
ids = list(text_bytes) # list of integers range 0 .. 255
# iteratively merge the most common pairs to create new tokens
merges = {} # (int, int) -> int
vocab = {idx: bytes([idx]) for idx in range(256)} # int -> bytes
for i in range(num_merges):
stats = get_stats(ids)
pair = max(stats, key=stats.get)
idx = 256 + i
merges[pair] = idx
vocab[idx] = vocab[pair[0]]+vocab[pair[1]]
if verbose:
print(f"merge {i+1}/{num_merges}: {pair} -> {idx} ({vocab[idx]}) had {stats[pair]} occurrences")
# save class variables
self.merges = merges # used in encode()
self.vocab = vocab # used in decode()
def decode(self, ids):
# given ids (list of integers), return Python string
text_bytes = b"".join(self.vocab[idx] for idx in ids)
text = text_bytes.decode("utf-8", errors="replace")
return text
def encode(self, text):
# given a string text, return the token ids
text_bytes = text.encode("utf-8")
ids = list(text_bytes)
while len(ids) >=2:
# find the pair with the lowest merge index
stats = get_stats(ids)
pair = min(stats, key=lambda p: self.merges.get(p, float("inf")))
if pair not in self.merges:
break # nothing else can be merged anymore
# otherwise let's merge the best pair (lowest merge index)
idx = self.merges[pair]
ids = merge(ids, pair, idx)
return ids
regex.py
"""
Minimal (byte-level) Byte Pair Encoding tokenizer.
Algorithmically follows along the GPT tokenizer:
https://github.com/openai/gpt-2/blob/master/src/encoder.py
Unlike BasicTokenizer:
- RegexTokenizer handles an optional regex splitting pattern.
- RegexTokenizer handles optional special tokens.
"""
import regex as re
from .base import Tokenizer, get_stats, merge
# the main GPT text split patterns, see
# https://github.com/openai/tiktoken/blob/main/tiktoken_ext/openai_public.py
GPT2_SPLIT_PATTERN = r"""'(?:[sdmt]|ll|ve|re)| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+"""
GPT4_SPLIT_PATTERN = r"""'(?i:[sdmt]|ll|ve|re)|[^\r\n\p{L}\p{N}]?+\p{L}+|\p{N}{1,3}| ?[^\s\p{L}\p{N}]++[\r\n]*|\s*[\r\n]|\s+(?!\S)|\s+"""
class RegexTokenizer(Tokenizer):
def __init__(self, pattern=None):
"""
- pattern: optional string to override the default (GPT-4 split pattern)
- special_tokens: str -> int dictionary of special tokens
example: {'<|endoftext|>': 100257}
"""
super().__init__()
self.pattern = GPT4_SPLIT_PATTERN if pattern is None else pattern
self.compiled_pattern = re.compile(self.pattern)
self.special_tokens = {}
self.inverse_special_tokens = {}
def train(self, text, vocab_size, verbose=False):
assert vocab_size >= 256
num_merges = vocab_size - 256
# split the text up into text chunks
text_chunks = re.findall(self.compiled_pattern, text)
# input text preprocessing
ids = [list(ch.encode("utf-8")) for ch in text_chunks]
# iteratively merge the most common pairs to create new tokens
merges = {} # (int, int) -> int
vocab = {idx: bytes([idx]) for idx in range(256)} # idx -> bytes
for i in range(num_merges):
# count the number of times every consecutive pair appears
stats = {}
for chunk_ids in ids:
# passing in stats will update it in place, adding up counts
get_stats(chunk_ids, stats)
# find the pair with the highest count
pair = max(stats, key=stats.get)
# mint a new token: assign it the next available id
idx = 256 + i
# replace all occurrences of pair in ids with idx
ids = [merge(chunk_ids, pair, idx) for chunk_ids in ids]
# save the merge
merges[pair] = idx
vocab[idx] = vocab[pair[0]] + vocab[pair[1]]
# prints
if verbose:
print(f"merge {i+1}/{num_merges}: {pair} -> {idx} ({vocab[idx]}) had {stats[pair]} occurrences")
# save class variables
self.merges = merges # used in encode()
self.vocab = vocab # used in decode()
def register_special_tokens(self, special_tokens):
# special_tokens is a dictionary of str -> int
# example: {"<|endoftext|>": 100257}
self.special_tokens = special_tokens
self.inverse_special_tokens = {v: k for k, v in special_tokens.items()}
def decode(self, ids):
# given ids (list of integers), return Python string
part_bytes = []
for idx in ids:
if idx in self.vocab:
part_bytes.append(self.vocab[idx])
elif idx in self.inverse_special_tokens:
part_bytes.append(self.inverse_special_tokens[idx].encode("utf-8"))
else:
raise ValueError(f"invalid token id: {idx}")
text_bytes = b"".join(part_bytes)
text = text_bytes.decode("utf-8", errors="replace")
return text
def _encode_chunk(self, text_bytes):
# return the token ids
# let's begin. first, convert all bytes to integers in range 0..255
ids = list(text_bytes)
while len(ids) >= 2:
# find the pair with the lowest merge index
stats = get_stats(ids)
pair = min(stats, key=lambda p: self.merges.get(p, float("inf")))
# subtle: if there are no more merges available, the key will
# result in an inf for every single pair, and the min will be
# just the first pair in the list, arbitrarily
# we can detect this terminating case by a membership check
if pair not in self.merges:
break # nothing else can be merged anymore
# otherwise let's merge the best pair (lowest merge index)
idx = self.merges[pair]
ids = merge(ids, pair, idx)
return ids
def encode_ordinary(self, text):
"""Encoding that ignores any special tokens."""
# split text into chunks of text by categories defined in regex pattern
text_chunks = re.findall(self.compiled_pattern, text)
# all chunks of text are encoded separately, then results are joined
ids = []
for chunk in text_chunks:
chunk_bytes = chunk.encode("utf-8") # raw bytes
chunk_ids = self._encode_chunk(chunk_bytes)
ids.extend(chunk_ids)
return ids
def encode(self, text, allowed_special="none_raise"):
"""
Unlike encode_ordinary, this function handles special tokens.
allowed_special: can be "all"|"none"|"none_raise" or a custom set of special tokens
if none_raise, then an error is raised if any special token is encountered in text
this is the default tiktoken behavior right now as well
any other behavior is either annoying, or a major footgun
"""
# decode the user desire w.r.t. handling of special tokens
special = None
if allowed_special == "all":
special = self.special_tokens
elif allowed_special == "none":
special = {}
elif allowed_special == "none_raise":
special = {}
assert all(token not in text for token in self.special_tokens)
elif isinstance(allowed_special, set):
special = {k: v for k, v in self.special_tokens.items() if k in allowed_special}
else:
raise ValueError(f"allowed_special={allowed_special} not understood")
if not special:
# shortcut: if no special tokens, just use the ordinary encoding
return self.encode_ordinary(text)
# otherwise, we have to be careful with potential special tokens in text
# we handle special tokens by splitting the text
# based on the occurrence of any exact match with any of the special tokens
# we can use re.split for this. note that surrounding the pattern with ()
# makes it into a capturing group, so the special tokens will be included
special_pattern = "(" + "|".join(re.escape(k) for k in special) + ")"
special_chunks = re.split(special_pattern, text)
# now all the special characters are separated from the rest of the text
# all chunks of text are encoded separately, then results are joined
ids = []
for part in special_chunks:
if part in special:
# this is a special token, encode it separately as a special case
ids.append(special[part])
else:
# this is an ordinary sequence, encode it normally
ids.extend(self.encode_ordinary(part))
return ids
gpt4.py
"""
Implements the GPT-4 Tokenizer as a light wrapper around the RegexTokenizer.
Note that this is a pretrained tokenizer. By default and inside init(), it
loads the pretrained tokenizer from the `cl100k_base` tokenizer of tiktoken.
"""
import tiktoken
from .regex import RegexTokenizer
def bpe(mergeable_ranks, token, max_rank):
# helper function used in get_gpt4_merges() to reconstruct the merge forest
parts = [bytes([b]) for b in token]
while True:
min_idx = None
min_rank = None
for i, pair in enumerate(zip(parts[:-1], parts[1:])):
rank = mergeable_ranks.get(pair[0] + pair[1])
if rank is not None and (min_rank is None or rank < min_rank):
min_idx = i
min_rank = rank
if min_rank is None or (max_rank is not None and min_rank >= max_rank):
break
assert min_idx is not None
parts = parts[:min_idx] + [parts[min_idx] + parts[min_idx + 1]] + parts[min_idx + 2:]
return parts
def recover_merges(mergeable_ranks):
# the `merges` are already the byte sequences in their merged state.
# so we have to recover the original pairings. We can do this by doing
# a small BPE training run on all the tokens, in their order.
# also see https://github.com/openai/tiktoken/issues/60
# also see https://github.com/karpathy/minbpe/issues/11#issuecomment-1950805306
merges = {}
for token, rank in mergeable_ranks.items():
if len(token) == 1:
continue # skip raw bytes
pair = tuple(bpe(mergeable_ranks, token, max_rank=rank))
assert len(pair) == 2
# recover the integer ranks of the pair
ix0 = mergeable_ranks[pair[0]]
ix1 = mergeable_ranks[pair[1]]
merges[(ix0, ix1)] = rank
return merges
GPT4_SPLIT_PATTERN = r"""'(?i:[sdmt]|ll|ve|re)|[^\r\n\p{L}\p{N}]?+\p{L}+|\p{N}{1,3}| ?[^\s\p{L}\p{N}]++[\r\n]*|\s*[\r\n]|\s+(?!\S)|\s+"""
GPT4_SPECIAL_TOKENS = {
'<|endoftext|>': 100257,
'<|fim_prefix|>': 100258,
'<|fim_middle|>': 100259,
'<|fim_suffix|>': 100260,
'<|endofprompt|>': 100276
}
class GPT4Tokenizer(RegexTokenizer):
"""Lightweight wrapper on RegexTokenizer that matches GPT-4's tokenizer."""
def __init__(self):
super().__init__(pattern=GPT4_SPLIT_PATTERN)
# get the official tokenizer and its merges
enc = tiktoken.get_encoding("cl100k_base")
mergeable_ranks = enc._mergeable_ranks
# the merges are those of gpt4, but we have to recover them
self.merges = recover_merges(mergeable_ranks)
# reconstruct the vocab from the merges
vocab = {idx: bytes([idx]) for idx in range(256)}
for (p0, p1), idx in self.merges.items():
vocab[idx] = vocab[p0] + vocab[p1]
self.vocab = vocab
# now here is another tricky part.
# for some reason, the tokens corresponding to individual bytes
# are permuted in a different order. This is completely non-sensical
# and probably historical, but therefore we have to deal with it here.
self.byte_shuffle = {i: mergeable_ranks[bytes([i])] for i in range(256)}
self.inverse_byte_shuffle = {v: k for k, v in self.byte_shuffle.items()}
# finally register the special tokens
self.register_special_tokens(GPT4_SPECIAL_TOKENS)
class GPT4Tokenizer(RegexTokenizer):
"""Lightweight wrapper on RegexTokenizer that matches GPT-4's tokenizer."""
def __init__(self):
super().__init__(pattern=GPT4_SPLIT_PATTERN)
# get the official tokenizer and its merges
enc = tiktoken.get_encoding("cl100k_base")
mergeable_ranks = enc._mergeable_ranks
# the merges are those of gpt4, but we have to recover them
self.merges = recover_merges(mergeable_ranks)
# reconstruct the vocab from the merges
vocab = {idx: bytes([idx]) for idx in range(256)}
for (p0, p1), idx in self.merges.items():
vocab[idx] = vocab[p0] + vocab[p1]
self.vocab = vocab
# now here is another tricky part.
# for some reason, the tokens corresponding to individual bytes
# are permuted in a different order. This is completely non-sensical
# and probably historical, but therefore we have to deal with it here.
self.byte_shuffle = {i: mergeable_ranks[bytes([i])] for i in range(256)}
self.inverse_byte_shuffle = {v: k for k, v in self.byte_shuffle.items()}
# finally register the special tokens
self.register_special_tokens(GPT4_SPECIAL_TOKENS)
def _encode_chunk(self, text_bytes):
# before we start processing bytes, we have to permute them
text_bytes = bytes(self.byte_shuffle[b] for b in text_bytes)
ids = super()._encode_chunk(text_bytes)
return ids
def decode(self, ids):
# we have to un-permute the bytes before we decode
text_bytes = b"".join(self.vocab[idx] for idx in ids)
text_bytes = bytes(self.inverse_byte_shuffle[b] for b in text_bytes)
text = text_bytes.decode("utf-8", errors="replace")
return text
# this is a pretrained tokenizer, it is not intended to be trained
def train(self, text, vocab_size, verbose=False):
raise NotImplementedError
# save/load would require some thought.
# we'd have to change save/load of base to add support for byte_shuffle...
# alternatively, we could move byte_shuffle to base class, but that would
# mean that we're making ugly our beautiful Tokenizer just to support
# the GPT-4 tokenizer and its weird historical quirks around byte_shuffle.
def save(self, file_prefix):
raise NotImplementedError("GPT4Tokenizer cannot be saved.")
def load(self, model_file):
raise NotImplementedError("GPT4Tokenizer cannot be loaded.")
def save_vocab(self, vocab_file):
# just for visualization purposes let's output the GPT-4 tokens
# in the exact same format as the base class would.
# simple run as:
# python -c "from minbpe import GPT4Tokenizer; GPT4Tokenizer().save_vocab('gpt4.vocab')"
from .base import render_token
# build vocab being mindful of the byte shuffle
vocab = {idx: bytes([self.inverse_byte_shuffle[idx]]) for idx in range(256)}
for (p0, p1), idx in self.merges.items():
vocab[idx] = vocab[p0] + vocab[p1]
# now merge the shuffled bytes and write to file
inverted_merges = {idx: pair for pair, idx in self.merges.items()}
with open(vocab_file, "w", encoding="utf-8") as f:
for idx, token in vocab.items():
s = render_token(token)
if idx in inverted_merges:
idx0, idx1 = inverted_merges[idx]
s0 = render_token(vocab[idx0])
s1 = render_token(vocab[idx1])
f.write(f"[{s0}][{s1}] -> [{s}] {idx}\n")
else:
f.write(f"[{s}] {idx}\n")
train.py
"""
Train our Tokenizers on some data, just to see them in action.
The whole thing runs in ~25 seconds on my laptop.
"""
import os
import time
from minbpe import BasicTokenzier, RegexTokenzier
# open some text and train a vocab of 512 tokens
text = open("tests/taylorswift.txt", "r", encoding="utf-8").read()
# create a directory for models, so we don't pollute the current directory
os.makedirs("models", exist_ok=True)
t0 = time.time()
for TokenizerClass, name in zip([BasicTokenzier, RegexTokenzier], ["basic","regex"]):
# construct the Tokenizer object and kick off verbose training
tokenizer = TokenizerClass()
tokenizer.train(text, 512, verbose=True)
# write two files in the models directory: name.model, and name.vocab
prefix = os.path.join("models", name)
tokenizer.save(prefix)
t1 = time.time()
print(f"Training took {t1-t0:.2f} seconds")
打赏作者