Skip to content

Commit fc2378e

Browse files
authored
feat: CompressionStream (#36)
* chore: initial compression stream impl., a bit hacky, but works * feat * chore: fix
1 parent 3d74147 commit fc2378e

34 files changed

+1222
-37
lines changed

example/server/createServer.ts

+10-1
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,19 @@ export function createServer(payload: string | Bun.BufferSource, port: number) {
2020
return new Response('No body provided', { status: 400 })
2121
}
2222

23+
const writer = Bun.file('uploaded_file').writer()
24+
2325
for await (const chunk of req.body) {
24-
console.log('Chunk:', chunk)
26+
// Write each chunk to file
27+
writer.write(chunk)
28+
// Debug
29+
console.log('Chunk saved:', chunk.length, 'bytes')
2530
}
2631

32+
await writer.end()
33+
34+
console.log('Upload complete, file saved')
35+
2736
return new Response('Upload successful', { status: 200 })
2837
} catch (error) {
2938
console.error('Upload error:', error)

example/tests/benchmark.tsx

+29-10
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@ import {
99
TouchableOpacity,
1010
View,
1111
} from 'react-native'
12-
import { fetch, showOpenFilePicker, WebSocket as FastWS } from 'react-native-fast-io'
12+
import {
13+
CompressionStream,
14+
fetch,
15+
showOpenFilePicker,
16+
WebSocket as FastWS,
17+
} from 'react-native-fast-io'
1318

1419
import {
1520
CHAT_PAYLOAD,
@@ -443,21 +448,35 @@ const styles = StyleSheet.create({
443448
},
444449
})
445450

446-
// tbd: playground
451+
// EXAMPLE 1
447452
setTimeout(async () => {
448-
const files = await showOpenFilePicker({
449-
startIn: 'desktop',
450-
types: [{ description: 'PDF files', accept: { 'application/pdf': ['.pdf'] } }],
453+
// File System API - https://developer.mozilla.org/en-US/docs/Web/API/File_System_API
454+
const [fileHandle] = await showOpenFilePicker()
455+
456+
// Get `File` object (file is not loaded into memory)
457+
const file = await fileHandle.getFile()
458+
459+
// File is streamed to the server
460+
await fetch('http://localhost:3002/upload', {
461+
method: 'POST',
462+
body: file,
451463
})
464+
}, 2000)
465+
466+
// EXAMPLE 2
467+
setTimeout(async () => {
468+
// File System API - https://developer.mozilla.org/en-US/docs/Web/API/File_System_API
469+
const [fileHandle] = await showOpenFilePicker()
452470

453-
if (!files[0]) {
454-
console.log('No file')
455-
}
471+
// Get `File` object (file is not loaded into memory)
472+
const file = await fileHandle.getFile()
456473

457-
const file = await files[0].getFile()
474+
// You can also transform the stream in JavaScript, still no loading, all lazy
475+
const compressed = file.stream().pipeThrough(new CompressionStream('gzip'))
458476

477+
// File is streamed to the server
459478
await fetch('http://localhost:3002/upload', {
460479
method: 'POST',
461-
body: file.stream(),
480+
body: compressed,
462481
})
463482
}, 2000)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
//
2+
// HybridGzipCompressor.swift
3+
// FastIO
4+
//
5+
// Created by Mike Grabowski on 11/11/2024.
6+
//
7+
8+
import Foundation
9+
import NitroModules
10+
import Compression
11+
12+
class HybridCompressor : HybridCompressorSpec {
13+
private var stream: compression_stream
14+
private var status: compression_status
15+
16+
private var format: CompressionAlgorithm
17+
private var totalSize: UInt32 = 0
18+
19+
init(algorithm: CompressionAlgorithm) {
20+
stream = compression_stream()
21+
status = compression_stream_init(&stream, COMPRESSION_STREAM_ENCODE, COMPRESSION_ZLIB)
22+
format = algorithm
23+
}
24+
25+
deinit {
26+
compression_stream_destroy(&stream)
27+
}
28+
29+
private func compressBuffer(source: UnsafePointer<UInt8>, sourceSize: Int, finalize: Bool = false) throws -> ArrayBufferHolder {
30+
let headerSize: Int = if totalSize == 0 {
31+
switch format {
32+
case .gzip: 10
33+
case .deflate: 2
34+
case .deflateRaw: 0
35+
}
36+
} else {
37+
0
38+
}
39+
let footerSize = (format == .gzip && finalize) ? 8 : 0
40+
41+
let destBufferSize = 64 * 1024 + headerSize + footerSize
42+
let destBuffer = UnsafeMutablePointer<UInt8>.allocate(capacity: destBufferSize)
43+
44+
if headerSize > 0 {
45+
switch format {
46+
case .gzip:
47+
let header = getGzipHeader()
48+
destBuffer.update(from: header, count: header.count)
49+
case .deflate:
50+
let header = getDeflateHeader()
51+
destBuffer.update(from: header, count: header.count)
52+
default:
53+
break
54+
}
55+
}
56+
57+
if format == .gzip && sourceSize > 0 {
58+
updateCRC32(data: source, size: sourceSize)
59+
}
60+
61+
totalSize = (totalSize &+ UInt32(sourceSize)) & 0xffffffff
62+
63+
stream.src_ptr = source
64+
stream.src_size = sourceSize
65+
stream.dst_ptr = destBuffer.advanced(by: headerSize)
66+
stream.dst_size = 64 * 1024
67+
68+
status = compression_stream_process(&stream, Int32(finalize ? COMPRESSION_STREAM_FINALIZE.rawValue : 0))
69+
70+
guard status != COMPRESSION_STATUS_ERROR else {
71+
destBuffer.deallocate()
72+
throw RuntimeError.error(withMessage: "Compression error")
73+
}
74+
75+
guard stream.src_size == 0 else {
76+
destBuffer.deallocate()
77+
throw RuntimeError.error(withMessage: "Unexpected remaining input data.")
78+
}
79+
80+
let currentOffset = headerSize + (64 * 1024 - stream.dst_size)
81+
82+
if footerSize > 0 {
83+
let footer = getGzipFooter()
84+
destBuffer.advanced(by: currentOffset).update(from: footer, count: footer.count)
85+
}
86+
87+
let deleteFunc = SwiftClosure {
88+
destBuffer.deallocate()
89+
}
90+
91+
return ArrayBufferHolder.makeBuffer(destBuffer, currentOffset + footerSize, deleteFunc)
92+
}
93+
94+
func compress(chunk: ArrayBufferHolder) throws -> ArrayBufferHolder {
95+
return try compressBuffer(
96+
source: UnsafePointer(chunk.data.assumingMemoryBound(to: UInt8.self)),
97+
sourceSize: chunk.size
98+
)
99+
}
100+
101+
func finalize() throws -> ArrayBufferHolder {
102+
let emptyBuffer = UnsafeMutablePointer<UInt8>.allocate(capacity: 1)
103+
defer {
104+
emptyBuffer.deallocate()
105+
}
106+
107+
return try compressBuffer(
108+
source: UnsafePointer(emptyBuffer),
109+
sourceSize: 0,
110+
finalize: true
111+
)
112+
}
113+
114+
/* Gzip */
115+
private var crc32: UInt32 = 0
116+
117+
private func getGzipHeader() -> [UInt8] {
118+
// GZIP header format:
119+
// 1F 8B - Magic number
120+
// 08 - Deflate compression method
121+
// 00 - Flags
122+
// 00 00 00 00 - Timestamp
123+
// 00 - Extra flags
124+
// FF - OS (unknown)
125+
return [0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff]
126+
}
127+
128+
private func getGzipFooter() -> [UInt8] {
129+
var footer = [UInt8](repeating: 0, count: 8)
130+
// CRC32 (4 bytes)
131+
footer[0] = UInt8(crc32 & 0xff)
132+
footer[1] = UInt8((crc32 >> 8) & 0xff)
133+
footer[2] = UInt8((crc32 >> 16) & 0xff)
134+
footer[3] = UInt8((crc32 >> 24) & 0xff)
135+
// Input size modulo 2^32 (4 bytes)
136+
footer[4] = UInt8(totalSize & 0xff)
137+
footer[5] = UInt8((totalSize >> 8) & 0xff)
138+
footer[6] = UInt8((totalSize >> 16) & 0xff)
139+
footer[7] = UInt8((totalSize >> 24) & 0xff)
140+
return footer
141+
}
142+
143+
private func updateCRC32(data: UnsafePointer<UInt8>, size: Int) {
144+
var crc = ~crc32
145+
for i in 0..<size {
146+
let byte = data[i]
147+
crc = (crc >> 8) ^ crcTable[Int((crc & 0xFF) ^ UInt32(byte))]
148+
}
149+
crc32 = ~crc
150+
}
151+
152+
/* Deflate */
153+
private func getDeflateHeader() -> [UInt8] {
154+
// Deflate header (CMF, FLG)
155+
return [0x78, 0x9c]
156+
}
157+
158+
var hybridContext = margelo.nitro.HybridContext()
159+
var memorySize: Int {
160+
return getSizeOf(self)
161+
}
162+
}
163+
164+
// CRC32 lookup table
165+
private let crcTable: [UInt32] = {
166+
var table = [UInt32](repeating: 0, count: 256)
167+
for i in 0..<256 {
168+
var crc = UInt32(i)
169+
for _ in 0..<8 {
170+
crc = (crc >> 1) ^ ((crc & 1) == 1 ? 0xEDB88320 : 0)
171+
}
172+
table[i] = crc
173+
}
174+
return table
175+
}()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
//
2+
// HybridCompressorFactory.swift
3+
// FastIO
4+
//
5+
// Created by Mike Grabowski on 11/11/2024.
6+
//
7+
8+
import Foundation
9+
10+
class HybridCompressorFactory : HybridCompressorFactorySpec {
11+
func create(algorithm: CompressionAlgorithm) throws -> (any HybridCompressorSpec) {
12+
return HybridCompressor(algorithm: algorithm)
13+
}
14+
15+
var hybridContext = margelo.nitro.HybridContext()
16+
var memorySize: Int {
17+
return getSizeOf(self)
18+
}
19+
}

packages/react-native-fast-io/ios/HybridDuplexStream.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class HybridDuplexStream : HybridDuplexStreamSpec {
1515
var inputStreamRef: InputStream? = InputStream()
1616
var outputStreamRef: OutputStream? = OutputStream(toMemory: ())
1717

18-
Stream.getBoundStreams(withBufferSize: 4096, inputStream: &inputStreamRef, outputStream: &outputStreamRef)
18+
Stream.getBoundStreams(withBufferSize: 64 * 1024, inputStream: &inputStreamRef, outputStream: &outputStreamRef)
1919

2020
guard let inputStreamRef, let outputStreamRef else {
2121
fatalError("Could not create streams")

packages/react-native-fast-io/nitro.json

+3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
},
2121
"DuplexStream": {
2222
"swift": "HybridDuplexStream"
23+
},
24+
"CompressorFactory": {
25+
"swift": "HybridCompressorFactory"
2326
}
2427
}
2528
}

packages/react-native-fast-io/nitrogen/generated/android/FastIO+autolinking.cmake

+2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ target_sources(
3131
../nitrogen/generated/shared/c++/HybridNetworkSpec.cpp
3232
../nitrogen/generated/shared/c++/HybridInputStreamSpec.cpp
3333
../nitrogen/generated/shared/c++/HybridOutputStreamSpec.cpp
34+
../nitrogen/generated/shared/c++/HybridCompressorFactorySpec.cpp
35+
../nitrogen/generated/shared/c++/HybridCompressorSpec.cpp
3436
../nitrogen/generated/shared/c++/HybridDuplexStreamSpec.cpp
3537
../nitrogen/generated/shared/c++/HybridWebSocketSpec.cpp
3638
../nitrogen/generated/shared/c++/HybridWebSocketManagerSpec.cpp

packages/react-native-fast-io/nitrogen/generated/ios/FastIO-Swift-Cxx-Bridge.cpp

+34
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
// Include C++ implementation defined types
1111
#include "FastIO-Swift-Cxx-Umbrella.hpp"
12+
#include "HybridCompressorFactorySpecSwift.hpp"
13+
#include "HybridCompressorSpecSwift.hpp"
1214
#include "HybridDuplexStreamSpecSwift.hpp"
1315
#include "HybridFileSystemSpecSwift.hpp"
1416
#include "HybridInputStreamSpecSwift.hpp"
@@ -84,6 +86,38 @@ namespace margelo::nitro::fastio::bridge::swift {
8486
return FastIO::HybridOutputStreamSpecCxxUnsafe::toUnsafe(swiftPart);
8587
}
8688

89+
// pragma MARK: std::shared_ptr<margelo::nitro::fastio::HybridCompressorSpec>
90+
std::shared_ptr<margelo::nitro::fastio::HybridCompressorSpec> create_std__shared_ptr_margelo__nitro__fastio__HybridCompressorSpec_(void* _Nonnull swiftUnsafePointer) {
91+
FastIO::HybridCompressorSpecCxx swiftPart = FastIO::HybridCompressorSpecCxxUnsafe::fromUnsafe(swiftUnsafePointer);
92+
return HybridContext::getOrCreate<margelo::nitro::fastio::HybridCompressorSpecSwift>(swiftPart);
93+
}
94+
void* _Nonnull get_std__shared_ptr_margelo__nitro__fastio__HybridCompressorSpec_(std__shared_ptr_margelo__nitro__fastio__HybridCompressorSpec_ cppType) {
95+
std::shared_ptr<margelo::nitro::fastio::HybridCompressorSpecSwift> swiftWrapper = std::dynamic_pointer_cast<margelo::nitro::fastio::HybridCompressorSpecSwift>(cppType);
96+
#ifdef NITRO_DEBUG
97+
if (swiftWrapper == nullptr) [[unlikely]] {
98+
throw std::runtime_error("Class \"HybridCompressorSpec\" is not implemented in Swift!");
99+
}
100+
#endif
101+
FastIO::HybridCompressorSpecCxx swiftPart = swiftWrapper->getSwiftPart();
102+
return FastIO::HybridCompressorSpecCxxUnsafe::toUnsafe(swiftPart);
103+
}
104+
105+
// pragma MARK: std::shared_ptr<margelo::nitro::fastio::HybridCompressorFactorySpec>
106+
std::shared_ptr<margelo::nitro::fastio::HybridCompressorFactorySpec> create_std__shared_ptr_margelo__nitro__fastio__HybridCompressorFactorySpec_(void* _Nonnull swiftUnsafePointer) {
107+
FastIO::HybridCompressorFactorySpecCxx swiftPart = FastIO::HybridCompressorFactorySpecCxxUnsafe::fromUnsafe(swiftUnsafePointer);
108+
return HybridContext::getOrCreate<margelo::nitro::fastio::HybridCompressorFactorySpecSwift>(swiftPart);
109+
}
110+
void* _Nonnull get_std__shared_ptr_margelo__nitro__fastio__HybridCompressorFactorySpec_(std__shared_ptr_margelo__nitro__fastio__HybridCompressorFactorySpec_ cppType) {
111+
std::shared_ptr<margelo::nitro::fastio::HybridCompressorFactorySpecSwift> swiftWrapper = std::dynamic_pointer_cast<margelo::nitro::fastio::HybridCompressorFactorySpecSwift>(cppType);
112+
#ifdef NITRO_DEBUG
113+
if (swiftWrapper == nullptr) [[unlikely]] {
114+
throw std::runtime_error("Class \"HybridCompressorFactorySpec\" is not implemented in Swift!");
115+
}
116+
#endif
117+
FastIO::HybridCompressorFactorySpecCxx swiftPart = swiftWrapper->getSwiftPart();
118+
return FastIO::HybridCompressorFactorySpecCxxUnsafe::toUnsafe(swiftPart);
119+
}
120+
87121
// pragma MARK: std::shared_ptr<margelo::nitro::fastio::HybridDuplexStreamSpec>
88122
std::shared_ptr<margelo::nitro::fastio::HybridDuplexStreamSpec> create_std__shared_ptr_margelo__nitro__fastio__HybridDuplexStreamSpec_(void* _Nonnull swiftUnsafePointer) {
89123
FastIO::HybridDuplexStreamSpecCxx swiftPart = FastIO::HybridDuplexStreamSpecCxxUnsafe::fromUnsafe(swiftUnsafePointer);

0 commit comments

Comments
 (0)