Skip to content

convert : write tensors in parallel #12837

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions convert_hf_to_gguf.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def __init__(self, dir_model: Path, ftype: gguf.LlamaFileType, fname_out: Path,
use_temp_file: bool = False, eager: bool = False,
metadata_override: Path | None = None, model_name: str | None = None,
split_max_tensors: int = 0, split_max_size: int = 0, dry_run: bool = False,
small_first_shard: bool = False, hparams: dict[str, Any] | None = None):
small_first_shard: bool = False, hparams: dict[str, Any] | None = None, thread_count: int = 2):
if type(self) is Model:
raise TypeError(f"{type(self).__name__!r} should not be directly instantiated")

Expand Down Expand Up @@ -109,7 +109,8 @@ def __init__(self, dir_model: Path, ftype: gguf.LlamaFileType, fname_out: Path,

# Configure GGUF Writer
self.gguf_writer = gguf.GGUFWriter(path=None, arch=gguf.MODEL_ARCH_NAMES[self.model_arch], endianess=self.endianess, use_temp_file=self.use_temp_file,
split_max_tensors=split_max_tensors, split_max_size=split_max_size, dry_run=dry_run, small_first_shard=small_first_shard)
split_max_tensors=split_max_tensors, split_max_size=split_max_size, dry_run=dry_run, small_first_shard=small_first_shard,
thread_count=thread_count)

@classmethod
def __init_subclass__(cls):
Expand Down Expand Up @@ -5470,6 +5471,10 @@ def parse_args() -> argparse.Namespace:
"--print-supported-models", action="store_true",
help="Print the supported models"
)
parser.add_argument(
"-t", "--threads", type=int, default=2,
help="Number of threads to use when writing the tensors. Make sure you have enough RAM for at least THREADS of the biggest tensors in the model when setting this.",
)

args = parser.parse_args()
if not args.print_supported_models and args.model is None:
Expand Down Expand Up @@ -5554,7 +5559,7 @@ def main() -> None:
metadata_override=args.metadata, model_name=args.model_name,
split_max_tensors=args.split_max_tensors,
split_max_size=split_str_to_n_bytes(args.split_max_size), dry_run=args.dry_run,
small_first_shard=args.no_tensor_first_split)
small_first_shard=args.no_tensor_first_split, thread_count=args.threads)

if args.vocab_only:
logger.info("Exporting model vocab...")
Expand Down
98 changes: 82 additions & 16 deletions gguf-py/gguf/gguf_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import shutil
import struct
import tempfile
import threading
from dataclasses import dataclass
from enum import Enum, auto
from math import prod
from pathlib import Path
from queue import Empty, Queue
from io import BufferedWriter
from typing import IO, Any, Sequence, Mapping
from string import ascii_letters, digits
Expand Down Expand Up @@ -60,8 +62,31 @@ class WriterState(Enum):
WEIGHTS = auto()


@dataclass
class TensorWriteInfo:
filename: Path
offset: int
post_pad: int
tensor: np.ndarray
bar: Any | None

def write_chunk(self, open_files: dict[Path, BufferedWriter]):
if self.filename not in open_files:
open_files[self.filename] = open(self.filename, "r+b")
f = open_files[self.filename]

f.seek(self.offset)
f.write(self.tensor.data)
if self.post_pad > 0:
f.write(bytes([0] * self.post_pad))
if self.bar is not None:
self.bar.update(self.tensor.nbytes)


class GGUFWriter:
fout: list[BufferedWriter] | None
filenames: list[Path] | None
thread_count: int
path: Path | None
temp_file: tempfile.SpooledTemporaryFile[bytes] | None
tensors: list[dict[str, TensorInfo]]
Expand All @@ -83,7 +108,8 @@ class GGUFWriter:

def __init__(
self, path: os.PathLike[str] | str | None, arch: str, use_temp_file: bool = False, endianess: GGUFEndian = GGUFEndian.LITTLE,
split_max_tensors: int = 0, split_max_size: int = 0, dry_run: bool = False, small_first_shard: bool = False
split_max_tensors: int = 0, split_max_size: int = 0, dry_run: bool = False, small_first_shard: bool = False,
thread_count: int = 2,
):
self.fout = None
self.path = Path(path) if path else None
Expand All @@ -98,6 +124,7 @@ def __init__(
self.split_max_size = split_max_size
self.dry_run = dry_run
self.small_first_shard = small_first_shard
self.thread_count = thread_count
logger.info("gguf: This GGUF file is for {0} Endian only".format(
"Big" if self.endianess == GGUFEndian.BIG else "Little",
))
Expand Down Expand Up @@ -173,6 +200,7 @@ def open_output_file(self, path: Path | None = None) -> None:

if self.path is not None:
filenames = self.print_plan()
self.filenames = filenames
self.fout = [open(filename, "wb") for filename in filenames]
self.state = WriterState.EMPTY

Expand Down Expand Up @@ -424,40 +452,78 @@ def write_tensors_to_file(self, *, progress: bool = False) -> None:
self.write_ti_data_to_file()

assert self.fout is not None
assert self.filenames is not None

for fout in self.fout:
self.write_padding(fout, fout.tell())

if self.temp_file is None:
shard_bar = None
bar = None
# Distribute writing the tensors between multiple threads
tensor_queue: Queue[TensorWriteInfo] = Queue()

offsets: list[int] = [fout.tell() for fout in self.fout]

if progress:
# TODO: add back the shard bar to show which shard is being written when single-threaded
from tqdm import tqdm

total_bytes = sum(ti.nbytes for t in self.tensors for ti in t.values())

if len(self.fout) > 1:
shard_bar = tqdm(desc=f"Shard (0/{len(self.fout)})", total=None, unit="byte", unit_scale=True)
bar = tqdm(desc="Writing", total=total_bytes, unit="byte", unit_scale=True)

for i, (fout, tensors) in enumerate(zip(self.fout, self.tensors)):
if shard_bar is not None:
shard_bar.set_description(f"Shard ({i + 1}/{len(self.fout)})")
total = sum(ti.nbytes for ti in tensors.values())
shard_bar.reset(total=(total if total > 0 else None))
for i, (filename, tensors) in enumerate(zip(self.filenames, self.tensors)):
offset = offsets[i]

# relying on the fact that Python dicts preserve insertion order (since 3.7)
for ti in tensors.values():
assert ti.tensor is not None # can only iterate once over the tensors
assert ti.tensor.nbytes == ti.nbytes
ti.tensor.tofile(fout)
if shard_bar is not None:
shard_bar.update(ti.nbytes)
if bar is not None:
bar.update(ti.nbytes)
self.write_padding(fout, ti.nbytes)
ti.tensor = None
start_offset = offset
nbytes = ti.tensor.nbytes
offset = self.ggml_pad(start_offset + nbytes, self.data_alignment)
padding = offset - (start_offset + nbytes)
tensor_queue.put(
TensorWriteInfo(
filename=filename,
offset=start_offset,
post_pad=padding,
tensor=ti.tensor,
bar=bar,
)
)
ti.tensor = None # avoid keeping a reference to written tensors

# Write tensors in parallel
# TODO: total tensor size limit for the running threads
def write_tensors_from_thread(queue: Queue[TensorWriteInfo]):
open_files: dict[Path, BufferedWriter] = {}
try:
while t := queue.get_nowait():
t.write_chunk(open_files)
del t
queue.task_done()
except Empty:
pass

for f in open_files.values():
f.close()

threads = [
threading.Thread(target=write_tensors_from_thread, args=(tensor_queue,))
for _ in range(self.thread_count)
]

for t in threads:
t.start()

# NOTE: thread joining has weird interactions with KeyboardInterrupt,
# so waiting for the queue to be "done" first.
tensor_queue.join()

for t in threads:
t.join()

else:
self.temp_file.seek(0)

Expand Down
5 changes: 5 additions & 0 deletions gguf-py/gguf/lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,9 @@ def tofile(self, *args, **kwargs):
eager = LazyNumpyTensor.to_eager(self)
return eager.tofile(*args, **kwargs)

@property
def data(self):
eager = LazyNumpyTensor.to_eager(self)
return eager.data

# TODO: __array_function__