Skip to content

Commit 8a94543

Browse files
authored
feat: update streaming api after implementing android (#47)
* init * chore: rework * update * remove * tweaks * nit * clean-up * tweak * clean-up
1 parent 30d1234 commit 8a94543

27 files changed

+229
-220
lines changed

example/tests/filesystem.tsx

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ export function FileSystemUI() {
3232

3333
const logContents = async () => {
3434
for await (const chunk of file!.stream()) {
35-
console.log(chunk)
35+
console.log('Chunk length: ', chunk.length)
3636
}
3737
}
3838

packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridFileSystem.kt

+45-45
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,6 @@ class HybridFileSystem : HybridFileSystemSpec() {
1313
private val context = NitroModules.applicationContext
1414
?: throw RuntimeException("Application context is missing")
1515

16-
init {
17-
context.addActivityEventListener(activityEventListener)
18-
}
19-
2016
override fun getMetadata(path: String): Metadata {
2117
val uri = Uri.parse(path)
2218
val document = DocumentFile.fromSingleUri(context, uri)
@@ -87,57 +83,61 @@ class HybridFileSystem : HybridFileSystemSpec() {
8783
}
8884
}
8985

90-
companion object {
91-
private const val FILE_PICKER_REQUEST_CODE = 1001
92-
private var pendingPromise: Promise<Array<String>>? = null
93-
94-
val activityEventListener = object : BaseActivityEventListener() {
95-
override fun onActivityResult(
96-
activity: Activity,
97-
requestCode: Int,
98-
resultCode: Int,
99-
data: Intent?
100-
) {
101-
handleActivityResult(requestCode, resultCode, data)
102-
}
86+
private var pendingPromise: Promise<Array<String>>? = null
87+
88+
private val activityEventListener = object : BaseActivityEventListener() {
89+
override fun onActivityResult(
90+
activity: Activity,
91+
requestCode: Int,
92+
resultCode: Int,
93+
data: Intent?
94+
) {
95+
handleActivityResult(requestCode, resultCode, data)
10396
}
97+
}.also { listener ->
98+
// tbd: register/deregister accordingly
99+
context.addActivityEventListener(listener)
100+
}
104101

105-
fun handleActivityResult(requestCode: Int, resultCode: Int, data: Intent?) {
106-
if (requestCode == FILE_PICKER_REQUEST_CODE) {
107-
val promise = pendingPromise
108-
?: throw RuntimeException("Promise missing")
109-
110-
if (resultCode == Activity.RESULT_OK && data != null) {
111-
try {
112-
val paths = mutableListOf<String>()
113-
114-
if (data.clipData != null) {
115-
// Multiple files
116-
for (i in 0 until data.clipData!!.itemCount) {
117-
data.clipData!!.getItemAt(i).uri.toString().let {
118-
paths.add(it)
119-
}
120-
}
121-
} else {
122-
// Single file
123-
data.data?.toString()?.let {
102+
private fun handleActivityResult(requestCode: Int, resultCode: Int, data: Intent?) {
103+
if (requestCode == FILE_PICKER_REQUEST_CODE) {
104+
val promise = pendingPromise
105+
?: throw RuntimeException("Promise missing")
106+
107+
if (resultCode == Activity.RESULT_OK && data != null) {
108+
try {
109+
val paths = mutableListOf<String>()
110+
111+
if (data.clipData != null) {
112+
// Multiple files
113+
for (i in 0 until data.clipData!!.itemCount) {
114+
data.clipData!!.getItemAt(i).uri.toString().let {
124115
paths.add(it)
125116
}
126117
}
127-
128-
promise.resolve(paths.toTypedArray())
129-
} catch (e: Exception) {
130-
promise.reject(Error(e.message))
118+
} else {
119+
// Single file
120+
data.data?.toString()?.let {
121+
paths.add(it)
122+
}
131123
}
132-
} else {
133-
promise.reject(Error("File picker cancelled"))
134-
}
135124

136-
pendingPromise = null
125+
promise.resolve(paths.toTypedArray())
126+
} catch (e: Exception) {
127+
promise.reject(Error(e.message))
128+
}
129+
} else {
130+
promise.reject(Error("File picker cancelled"))
137131
}
132+
133+
pendingPromise = null
138134
}
139135
}
140136

141137
override val memorySize: Long
142138
get() = 0L
143-
}
139+
140+
companion object {
141+
private const val FILE_PICKER_REQUEST_CODE = 1001
142+
}
143+
}

packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridInputStream.kt

+29-27
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,46 @@
11
package com.margelo.nitro.fastio
22

33
import com.margelo.nitro.core.ArrayBuffer
4+
import com.margelo.nitro.core.Promise
45
import java.io.InputStream
56

67
class HybridInputStream(private val stream: InputStream) : HybridInputStreamSpec() {
7-
private var isOpen = true
8-
9-
override fun hasBytesAvailable(): Boolean {
10-
if (!isOpen) return false
11-
12-
return try {
13-
stream.available() > 0
14-
} catch (e: Exception) {
15-
false
8+
override fun read(): Promise<ArrayBuffer> {
9+
return Promise<ArrayBuffer>().apply {
10+
try {
11+
val bytes = ByteArray(HybridStreamFactory.BUFFER_SIZE)
12+
val bytesRead = stream.read(bytes, 0, bytes.size)
13+
14+
when {
15+
bytesRead == -1 -> {
16+
// End of stream
17+
resolve(ArrayBuffer.allocate(0))
18+
}
19+
bytesRead > 0 -> {
20+
val arrayBuffer = ArrayBuffer.allocate(bytesRead)
21+
22+
val destBuffer = arrayBuffer.getBuffer(false)
23+
destBuffer.put(bytes, 0, bytesRead)
24+
25+
resolve(arrayBuffer)
26+
}
27+
else -> {
28+
// Error case
29+
reject(Error("Unexpected error reading stream"))
30+
}
31+
}
32+
} catch (e: Exception) {
33+
reject(Error(e.message))
34+
}
1635
}
1736
}
1837

19-
override fun read(buffer: ArrayBuffer, maxLength: Double): Double {
20-
val byteBuffer = buffer.getBuffer(false)
21-
22-
val tempBuffer = ByteArray(minOf(maxLength.toInt(), buffer.size))
23-
24-
val bytesRead = stream.read(tempBuffer, 0, tempBuffer.size)
25-
26-
if (bytesRead > 0) {
27-
byteBuffer.put(tempBuffer, 0, bytesRead)
28-
}
29-
30-
return bytesRead.toDouble()
31-
}
32-
3338
override fun open() {
34-
// no-op
39+
// No explicit open needed for Java InputStreams
3540
}
3641

3742
override fun close() {
38-
if (!isOpen) return
39-
4043
stream.close()
41-
isOpen = false
4244
}
4345

4446
override val memorySize: Long

packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridOutputStream.kt

+23-8
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,37 @@
11
package com.margelo.nitro.fastio
22

33
import com.margelo.nitro.core.ArrayBuffer
4+
import com.margelo.nitro.core.Promise
5+
import java.io.OutputStream
46

5-
class HybridOutputStream : HybridOutputStreamSpec() {
6-
override fun hasSpaceAvailable(): Boolean {
7-
throw NotImplementedError("HybridOutputStream.hasSpaceAvailable() not implemented")
8-
}
7+
class HybridOutputStream(private val stream: OutputStream) : HybridOutputStreamSpec() {
8+
9+
override fun write(buffer: ArrayBuffer): Promise<Unit> {
10+
val byteBuffer = buffer.getBuffer(false)
11+
val bytes = ByteArray(buffer.size)
12+
byteBuffer.get(bytes)
913

10-
override fun write(buffer: ArrayBuffer, maxLength: Double): Double {
11-
throw NotImplementedError("HybridOutputStream.write() not implemented")
14+
return Promise<Unit>().apply {
15+
try {
16+
stream.write(bytes)
17+
resolve(Unit)
18+
} catch (e: Exception) {
19+
reject(Error(e.message))
20+
}
21+
}
1222
}
1323

1424
override fun open() {
15-
throw NotImplementedError("HybridOutputStream.open() not implemented")
25+
// No explicit open needed for Java OutputStreams
1626
}
1727

1828
override fun close() {
19-
throw NotImplementedError("HybridOutputStream.close() not implemented")
29+
try {
30+
stream.flush()
31+
stream.close()
32+
} catch (e: Exception) {
33+
println("Error closing stream: ${e.message}")
34+
}
2035
}
2136

2237
override val memorySize: Long

packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridStreamFactory.kt

+3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ class HybridStreamFactory : HybridStreamFactorySpec() {
2222
get() = 0L
2323

2424
companion object {
25+
@JvmStatic
26+
val BUFFER_SIZE: Int = getBufferSize()
27+
2528
@JvmStatic
2629
private external fun getBufferSize(): Int
2730
}

packages/react-native-fast-io/ios/HybridDuplexStream.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class HybridDuplexStream : HybridDuplexStreamSpec {
1515
var inputStreamRef: InputStream? = InputStream()
1616
var outputStreamRef: OutputStream? = OutputStream(toMemory: ())
1717

18-
Stream.getBoundStreams(withBufferSize: HybridStreamFactory.BUFFER_SIZE, inputStream: &inputStreamRef, outputStream: &outputStreamRef)
18+
Stream.getBoundStreams(withBufferSize: Int(HybridStreamFactory.BUFFER_SIZE), inputStream: &inputStreamRef, outputStream: &outputStreamRef)
1919

2020
guard let inputStreamRef, let outputStreamRef else {
2121
fatalError("Could not create streams")

packages/react-native-fast-io/ios/HybridInputStream.swift

+22-6
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,32 @@ class HybridInputStream : HybridInputStreamSpec {
1515
self.stream = stream
1616
}
1717

18-
func hasBytesAvailable() throws -> Bool {
19-
stream.hasBytesAvailable
20-
}
21-
2218
func open() throws -> Void {
2319
stream.open()
2420
}
2521

26-
func read(buffer: ArrayBufferHolder, maxLength: Double) throws -> Double {
27-
Double(stream.read(buffer.data, maxLength: Int(maxLength)))
22+
func read() throws -> Promise<ArrayBufferHolder> {
23+
let promise = Promise<ArrayBufferHolder>()
24+
25+
Task {
26+
let size = Int(HybridStreamFactory.BUFFER_SIZE)
27+
let data = UnsafeMutablePointer<UInt8>.allocate(capacity: size)
28+
29+
let bytesRead = stream.read(data, maxLength: size)
30+
31+
let deleteFunc = {
32+
data.deallocate()
33+
}
34+
35+
if (bytesRead >= 0) {
36+
promise.resolve(withResult: ArrayBufferHolder.wrap(dataWithoutCopy: data, size: bytesRead, onDelete: deleteFunc))
37+
} else {
38+
deleteFunc()
39+
promise.reject(withError: stream.streamError ?? RuntimeError.error(withMessage: "Unexpected error reading stream"))
40+
}
41+
}
42+
43+
return promise
2844
}
2945

3046
func close() {

packages/react-native-fast-io/ios/HybridOutputStream.swift

+16-6
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,26 @@ class HybridOutputStream : HybridOutputStreamSpec {
1515
self.stream = stream
1616
}
1717

18-
func hasSpaceAvailable() throws -> Bool {
19-
stream.hasSpaceAvailable
20-
}
21-
2218
func open() throws -> Void {
2319
stream.open()
2420
}
2521

26-
func write(buffer: ArrayBufferHolder, maxLength: Double) throws -> Double {
27-
Double(stream.write(buffer.data, maxLength: Int(maxLength)))
22+
func write(buffer: ArrayBufferHolder) throws -> Promise<Void> {
23+
let promise = Promise<Void>()
24+
25+
let data = buffer.data
26+
let length = buffer.size
27+
28+
Task {
29+
let bytesWritten = stream.write(data, maxLength: length)
30+
if (bytesWritten == length) {
31+
promise.resolve(withResult: ())
32+
} else {
33+
promise.reject(withError: stream.streamError ?? RuntimeError.error(withMessage: "Unexpected error writing to stream"))
34+
}
35+
}
36+
37+
return promise
2838
}
2939

3040
func close() {

packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridInputStreamSpec.cpp

+16-8
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
// Forward declaration of `ArrayBuffer` to properly resolve imports.
1111
namespace NitroModules { class ArrayBuffer; }
1212

13+
#include <future>
1314
#include <NitroModules/ArrayBuffer.hpp>
15+
#include <NitroModules/JPromise.hpp>
1416
#include <NitroModules/JArrayBuffer.hpp>
1517

1618
namespace margelo::nitro::fastio {
@@ -34,15 +36,21 @@ namespace margelo::nitro::fastio {
3436

3537

3638
// Methods
37-
bool JHybridInputStreamSpec::hasBytesAvailable() {
38-
static const auto method = _javaPart->getClass()->getMethod<jboolean()>("hasBytesAvailable");
39+
std::future<std::shared_ptr<ArrayBuffer>> JHybridInputStreamSpec::read() {
40+
static const auto method = _javaPart->getClass()->getMethod<jni::local_ref<JPromise::javaobject>()>("read");
3941
auto __result = method(_javaPart);
40-
return static_cast<bool>(__result);
41-
}
42-
double JHybridInputStreamSpec::read(const std::shared_ptr<ArrayBuffer>& buffer, double maxLength) {
43-
static const auto method = _javaPart->getClass()->getMethod<double(jni::alias_ref<JArrayBuffer::javaobject> /* buffer */, double /* maxLength */)>("read");
44-
auto __result = method(_javaPart, JArrayBuffer::wrap(buffer), maxLength);
45-
return __result;
42+
return [&]() {
43+
auto __promise = std::make_shared<std::promise<std::shared_ptr<ArrayBuffer>>>();
44+
__result->cthis()->addOnResolvedListener([=](const jni::alias_ref<jni::JObject>& __boxedResult) {
45+
auto __result = jni::static_ref_cast<JArrayBuffer::javaobject>(__boxedResult);
46+
__promise->set_value(__result->cthis()->getArrayBuffer());
47+
});
48+
__result->cthis()->addOnRejectedListener([=](const jni::alias_ref<jni::JString>& __message) {
49+
std::runtime_error __error(__message->toStdString());
50+
__promise->set_exception(std::make_exception_ptr(__error));
51+
});
52+
return __promise->get_future();
53+
}();
4654
}
4755
void JHybridInputStreamSpec::open() {
4856
static const auto method = _javaPart->getClass()->getMethod<void()>("open");

packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridInputStreamSpec.hpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ namespace margelo::nitro::fastio {
5151

5252
public:
5353
// Methods
54-
bool hasBytesAvailable() override;
55-
double read(const std::shared_ptr<ArrayBuffer>& buffer, double maxLength) override;
54+
std::future<std::shared_ptr<ArrayBuffer>> read() override;
5655
void open() override;
5756
void close() override;
5857

0 commit comments

Comments
 (0)