|
| 1 | +// Upload file to server using /files API |
| 2 | + |
| 3 | +import * as core from '../core'; |
| 4 | +import { isAxiosError } from 'axios'; |
| 5 | +import fs from 'fs'; |
| 6 | +import fetch from 'node-fetch'; |
| 7 | +import * as path from 'path'; |
| 8 | +import progress from 'progress-stream'; |
| 9 | +import readline from 'readline'; |
| 10 | +import pkg from 'parquetjs'; |
| 11 | +const { ParquetReader } = pkg; |
| 12 | + |
| 13 | +export interface FileResponse { |
| 14 | + id: string; |
| 15 | + object: string; |
| 16 | + type: 'jsonl' | 'parquet'; |
| 17 | + purpose: 'fine-tune'; |
| 18 | + filename: string; |
| 19 | + bytes: number; |
| 20 | + line_count: number; |
| 21 | + processed: boolean; |
| 22 | +} |
| 23 | + |
| 24 | +export interface ErrorResponse { |
| 25 | + message: string; |
| 26 | +} |
| 27 | + |
| 28 | +const failedUploadMessage = { |
| 29 | + message: 'failed to upload file', |
| 30 | +}; |
| 31 | + |
| 32 | +const baseURL = 'https://api.together.xyz/v1'; |
| 33 | +const MAX_FILE_SIZE = 4.8; // GB |
| 34 | +const BYTES_PER_GB = 1024 * 1024 * 1024; |
| 35 | +const MIN_SAMPLES = 1; |
| 36 | +const PARQUET_EXPECTED_COLUMNS = ['input_ids', 'attention_mask', 'labels']; |
| 37 | + |
| 38 | +export interface CheckFileResponse { |
| 39 | + success: boolean; |
| 40 | + message?: string; |
| 41 | +} |
| 42 | +export async function check_file(fileName: string): Promise<CheckFileResponse> { |
| 43 | + const stat = fs.statSync(fileName); |
| 44 | + if (stat.size == 0) { |
| 45 | + return { success: false, message: `File is empty` }; |
| 46 | + } |
| 47 | + |
| 48 | + if (stat.size > MAX_FILE_SIZE * BYTES_PER_GB) { |
| 49 | + return { success: false, message: `File size exceeds the limit of ${MAX_FILE_SIZE} GB` }; |
| 50 | + } |
| 51 | + |
| 52 | + const fileType = path.extname(fileName); |
| 53 | + if (fileType !== '.jsonl' && fileType !== '.parquet') { |
| 54 | + return { |
| 55 | + success: false, |
| 56 | + message: 'File type must be either .jsonl or .parquet', |
| 57 | + }; |
| 58 | + } |
| 59 | + |
| 60 | + if (fileType == '.jsonl') { |
| 61 | + const jsonlCheck = await check_jsonl(fileName); |
| 62 | + if (jsonlCheck) { |
| 63 | + return { success: false, message: jsonlCheck }; |
| 64 | + } |
| 65 | + } |
| 66 | + |
| 67 | + if (fileType == '.parquet') { |
| 68 | + const parquetCheck = await check_parquet(fileName); |
| 69 | + if (parquetCheck) { |
| 70 | + return { success: false, message: parquetCheck }; |
| 71 | + } |
| 72 | + } |
| 73 | + |
| 74 | + return { success: true }; |
| 75 | +} |
| 76 | + |
| 77 | +export async function check_parquet(fileName: string): Promise<string | undefined> { |
| 78 | + try { |
| 79 | + const reader = await ParquetReader.openFile(fileName); |
| 80 | + const cursor = reader.getCursor(); |
| 81 | + let record = null; |
| 82 | + |
| 83 | + const fieldNames = Object.keys(reader.schema.fields); |
| 84 | + if (!('input_ids' in fieldNames)) { |
| 85 | + return `Parquet file ${fileName} does not contain the 'input_ids' column.`; |
| 86 | + } |
| 87 | + |
| 88 | + for (const fieldName in fieldNames) { |
| 89 | + if (!PARQUET_EXPECTED_COLUMNS.includes(fieldName)) { |
| 90 | + return `Parquet file ${fileName} contains unexpected column ${fieldName}. Only ${PARQUET_EXPECTED_COLUMNS.join( |
| 91 | + ', ', |
| 92 | + )} are supported`; |
| 93 | + } |
| 94 | + } |
| 95 | + |
| 96 | + const numRows = reader.getRowCount() as unknown as number; |
| 97 | + if (numRows < MIN_SAMPLES) { |
| 98 | + return `Parquet file ${fileName} contains only ${numRows} samples. Minimum of ${MIN_SAMPLES} samples are required`; |
| 99 | + } |
| 100 | + |
| 101 | + await reader.close(); |
| 102 | + } catch (err) { |
| 103 | + return `failed to read parquet file ${fileName}`; |
| 104 | + } |
| 105 | + |
| 106 | + return undefined; |
| 107 | +} |
| 108 | + |
| 109 | +// return undefined if the file is valid, otherwise return an error message |
| 110 | +export async function check_jsonl(fileName: string): Promise<string | undefined> { |
| 111 | + const fileStream = fs.createReadStream(fileName); |
| 112 | + |
| 113 | + const rl = readline.createInterface({ |
| 114 | + input: fileStream, |
| 115 | + crlfDelay: Infinity, |
| 116 | + }); |
| 117 | + |
| 118 | + let errors: string[] = []; |
| 119 | + let lineNumber = 1; |
| 120 | + for await (const line of rl) { |
| 121 | + try { |
| 122 | + // do not proceed if there are too many errors |
| 123 | + if (errors.length > 20) { |
| 124 | + break; |
| 125 | + } |
| 126 | + const parsedLine = JSON.parse(line); |
| 127 | + |
| 128 | + if (typeof parsedLine !== 'object') { |
| 129 | + errors.push(`Line number ${lineNumber} is not a valid JSON object`); |
| 130 | + continue; |
| 131 | + } |
| 132 | + |
| 133 | + if (!('text' in parsedLine)) { |
| 134 | + errors.push( |
| 135 | + `Missing 'text' field was found on line ${lineNumber} of the the input file. Expected format: {'text': 'my sample string'}.`, |
| 136 | + ); |
| 137 | + continue; |
| 138 | + } |
| 139 | + |
| 140 | + if (typeof parsedLine['text'] !== 'string') { |
| 141 | + errors.push(`'Invalid value type for "text" key on line ${lineNumber}. Expected string`); |
| 142 | + continue; |
| 143 | + } |
| 144 | + } catch (error) { |
| 145 | + errors.push(`Error parsing line number ${lineNumber}`); |
| 146 | + } |
| 147 | + lineNumber += 1; |
| 148 | + } |
| 149 | + |
| 150 | + lineNumber -= 1; |
| 151 | + if (lineNumber < MIN_SAMPLES) { |
| 152 | + errors.push(`Processing ${fileName} resulted in only ${lineNumber - 1} samples.`); |
| 153 | + } |
| 154 | + |
| 155 | + if (errors.length > 0) { |
| 156 | + return errors.join('\n'); |
| 157 | + } |
| 158 | + |
| 159 | + return undefined; |
| 160 | +} |
| 161 | + |
| 162 | +export async function upload(fileName: string, check: boolean = true): Promise<FileResponse | ErrorResponse> { |
| 163 | + let purpose = 'fine-tune'; |
| 164 | + if (!fs.existsSync(fileName)) { |
| 165 | + return { |
| 166 | + message: 'File does not exists', |
| 167 | + }; |
| 168 | + } |
| 169 | + |
| 170 | + const fileType = path.extname(fileName); |
| 171 | + if (fileType !== '.jsonl' && fileType !== '.parquet') { |
| 172 | + return { |
| 173 | + message: 'File type must be either .jsonl or .parquet', |
| 174 | + }; |
| 175 | + } |
| 176 | + |
| 177 | + if (check) { |
| 178 | + const checkFile = await check_file(fileName); |
| 179 | + if (!checkFile.success) { |
| 180 | + return { |
| 181 | + message: checkFile.message || `verification of ${fileName} failed with some unknown reason`, |
| 182 | + }; |
| 183 | + } |
| 184 | + } |
| 185 | + |
| 186 | + // steps to do |
| 187 | + // 1. check if file exists |
| 188 | + // 2. get signed upload url |
| 189 | + // 3. upload file |
| 190 | + const baseUrl = core.readEnv('TOGETHER_API_BASE_URL') || 'https://api.together.ai/v1'; |
| 191 | + const apiKey = |
| 192 | + core.readEnv('TOGETHER_API_KEY') || '3c24363cf5506cb56b48e7e99de5e182a1a544965f3d9f38833a6db35d6f7aad'; |
| 193 | + |
| 194 | + if (!apiKey) { |
| 195 | + return { |
| 196 | + message: 'API key is required', |
| 197 | + }; |
| 198 | + } |
| 199 | + |
| 200 | + const getSigned = baseURL + '/files'; |
| 201 | + |
| 202 | + try { |
| 203 | + const params = new URLSearchParams({ |
| 204 | + file_name: fileName, |
| 205 | + purpose: purpose, |
| 206 | + }); |
| 207 | + const fullUrl = `${getSigned}?${params}`; |
| 208 | + const r = await fetch(fullUrl, { |
| 209 | + method: 'POST', |
| 210 | + headers: { |
| 211 | + 'Content-Type': 'application/x-www-form-urlencoded', |
| 212 | + Authorization: `Bearer ${apiKey}`, |
| 213 | + }, |
| 214 | + redirect: 'manual', |
| 215 | + body: params.toString(), |
| 216 | + }); |
| 217 | + |
| 218 | + if (r.status !== 302) { |
| 219 | + return failedUploadMessage; |
| 220 | + } |
| 221 | + |
| 222 | + const uploadUrl = r.headers.get('location') || ''; |
| 223 | + if (!uploadUrl || uploadUrl === '') { |
| 224 | + return failedUploadMessage; |
| 225 | + } |
| 226 | + const fileId = r.headers.get('x-together-file-id') || ''; |
| 227 | + if (!fileId || fileId === '') { |
| 228 | + return failedUploadMessage; |
| 229 | + } |
| 230 | + |
| 231 | + const fileStream = fs.createReadStream(fileName); |
| 232 | + const fileSize = fs.statSync(fileName).size; |
| 233 | + |
| 234 | + const progressStream = progress({ |
| 235 | + length: fileSize, |
| 236 | + time: 100, // Emit progress events every 100ms |
| 237 | + }); |
| 238 | + |
| 239 | + // Listen to progress events and log them |
| 240 | + progressStream.on('progress', (progress) => { |
| 241 | + displayProgress(progress.percentage); |
| 242 | + }); |
| 243 | + |
| 244 | + let uploadedBytes = 0; |
| 245 | + // upload the file to uploadUrl |
| 246 | + const uploadResponse = await fetch(uploadUrl, { |
| 247 | + method: 'PUT', |
| 248 | + headers: { |
| 249 | + 'Content-Type': 'application/octet-stream', |
| 250 | + }, |
| 251 | + body: fileStream.pipe(progressStream), |
| 252 | + }); |
| 253 | + |
| 254 | + displayProgress(100); |
| 255 | + process.stdout.write('\n'); |
| 256 | + |
| 257 | + return { |
| 258 | + id: fileId, |
| 259 | + object: 'file', |
| 260 | + type: 'jsonl', |
| 261 | + purpose: 'fine-tune', |
| 262 | + filename: fileName, |
| 263 | + bytes: fileSize, |
| 264 | + line_count: 0, |
| 265 | + processed: true, |
| 266 | + }; |
| 267 | + } catch (error) { |
| 268 | + if (isAxiosError(error)) { |
| 269 | + // handle axios error here |
| 270 | + if (error.status) { |
| 271 | + return { |
| 272 | + message: `failed to upload file with status ${error.status}`, |
| 273 | + }; |
| 274 | + } |
| 275 | + } |
| 276 | + |
| 277 | + return { |
| 278 | + message: 'failed to upload file', |
| 279 | + }; |
| 280 | + } |
| 281 | +} |
| 282 | + |
| 283 | +async function displayProgress(progress: number) { |
| 284 | + const barWidth = 40; // Number of characters for the progress bar |
| 285 | + const completedBars = Math.round((progress / 100) * barWidth); |
| 286 | + let remainingBars = barWidth - completedBars; |
| 287 | + if (remainingBars < 0) { |
| 288 | + remainingBars = 0; |
| 289 | + } |
| 290 | + const progressBar = `[${'='.repeat(completedBars)}${' '.repeat(remainingBars)}] ${progress.toFixed(2)}%`; |
| 291 | + |
| 292 | + // Clear the line and write progress |
| 293 | + //process.stdout.clearLine(0); //clean entire line |
| 294 | + process.stdout.cursorTo(0); |
| 295 | + process.stdout.write(progressBar, () => {}); |
| 296 | + await sleep(2000); |
| 297 | +} |
| 298 | + |
| 299 | +async function sleep(ms: number): Promise<void> { |
| 300 | + return new Promise((resolve) => setTimeout(resolve, ms)); |
| 301 | +} |
0 commit comments