-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathllama_cpp_chat_completion_wrapper.py
132 lines (108 loc) · 4.94 KB
/
llama_cpp_chat_completion_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from llama_cpp import Llama
from functools import partial
from typing import List, Literal, TypedDict, Callable
Role = Literal["system", "user", "assistant"]
class Message(TypedDict):
role: Role
content: str
B_INST, E_INST = "[INST]", "[/INST]"
B_SYS, E_SYS = "<<SYS>>\n", "\n<</SYS>>\n\n"
DEFAULT_SYSTEM_PROMPT = """\
You are a helpful, respectful and honest assistant. Always answer as helpfully as possible, while being safe. Your answers should not include any harmful, unethical, racist, sexist, toxic, dangerous, or illegal content. Please ensure that your responses are socially unbiased and positive in nature.
If a question does not make any sense, or is not factually coherent, explain why instead of answering something not correct. If you don't know the answer to a question, please don't share false information."""
def _llama2_format_messages(messages: List[Message], tokenizer_encode: Callable) -> List[int]:
if messages[0]["role"] != "system":
messages = [
{
"role": "system",
"content": DEFAULT_SYSTEM_PROMPT,
}
] + messages
messages = [
{
"role": messages[1]["role"],
"content": B_SYS + messages[0]["content"] + E_SYS + messages[1]["content"],
}
] + messages[2:]
assert all([msg["role"] == "user" for msg in messages[::2]]) and all(
[msg["role"] == "assistant" for msg in messages[1::2]]
), (
"model only supports 'system', 'user' and 'assistant' roles, "
"starting with 'system', then 'user' and alternating (u/a/u/a/u...)"
)
messages_tokens: List[int] = sum(
[
tokenizer_encode(
f"{B_INST} {(prompt['content']).strip()} {E_INST} {(answer['content']).strip()} ",
bos=True,
eos=True,
)
for prompt, answer in zip(
messages[::2],
messages[1::2],
)
],
[],
)
assert messages[-1]["role"] == "user", f"Last message must be from user, got {messages[-1]['role']}"
messages_tokens += tokenizer_encode(
f"{B_INST} {(messages[-1]['content']).strip()} {E_INST}",
bos=True,
eos=False,
)
return messages_tokens
def _llama_cpp_tokenizer_encode(s: str, bos: bool, eos: bool, llm: Llama) -> List[int]:
assert type(s) is str
t = llm.tokenize(text=b" " + bytes(s, encoding="utf-8"), add_bos=False)
if bos:
t = [llm.token_bos()] + t
if eos:
t = t + [llm.token_eos()]
return t
class Llama2ChatCompletionWrapper:
def __init__(self, model_path: str, callback: Callable[[Message], None] = None, tokenizer_encoder: Callable = None) -> None:
self.llm = Llama(model_path=model_path)
if tokenizer_encoder is None:
self._tokenizer_encode = partial(_llama_cpp_tokenizer_encode, llm=self.llm)
else:
self._tokenizer_encode = tokenizer_encoder
self.callback = callback
def new_session(self, system_content: str | None = None, messages: List[Message] | None = None):
self.messages: List[Message] = []
# if self.callback is not None:
# self.callback()
if system_content is not None:
assert messages is None
self.messages.append(Message(role="system", content=system_content))
if self.callback is not None:
self.callback(self.messages[-1])
elif messages is not None:
self.messages = messages
if self.callback is not None:
for msg in self.messages:
self.callback(msg)
def __call__(
self, message: str, post_process: Callable[[str], str] | None = None, max_tokens: int = 128, params: dict = {}
) -> str:
self.messages.append(Message(role="user", content=message))
if self.callback is not None:
self.callback(self.messages[-1])
messages_tokens = _llama2_format_messages(self.messages, tokenizer_encode=self._tokenizer_encode)
completion = self.llm.generate(messages_tokens, **params)
max_tokens = (
max_tokens if max_tokens + len(messages_tokens) < self.llm._n_ctx else (self.llm._n_ctx - len(messages_tokens))
)
result = []
for i, token in enumerate(completion):
if max_tokens == i or token == self.llm.token_eos():
break
result.append(self.llm.detokenize([token]).decode("utf-8"))
result = "".join(result).strip()
if post_process is not None:
# if self.callback is not None:
# self.callback()
result = post_process(result)
self.messages.append(Message(role="assistant", content=result))
if self.callback is not None:
self.callback(self.messages[-1])
return result