Skip to content

Add Store.runIsolated to run database ops in the background #384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions objectbox/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

* Support [ObjectBox Admin](https://docs.objectbox.io/data-browser) for Android apps to browse
the database. #148
* Add `Store.runIsolated` to run database operations (asynchronous) in the background
(requires Flutter 2.8.0/Dart 2.15.0 or newer). It spawns an isolate, runs the given callback in that
isolate with its own Store and returns the result of the callback. This is similar to Flutters
compute, but with the callback having access to a Store. #384
* Add `Store.attach` to attach to a Store opened in a directory. This is an improved replacement for
`Store.fromReference` to share a Store across isolates. It is no longer required to pass a
Store reference and the underlying Store remains open until the last instance is closed. #376
Expand Down
98 changes: 88 additions & 10 deletions objectbox/lib/src/native/store.dart
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class Store {
final _reader = ReaderWithCBuffer();
Transaction? _tx;

/// Path to the database directory.
final String directoryPath;

/// Absolute path to the database directory, used for open check.
final String _absoluteDirectoryPath;

Expand Down Expand Up @@ -88,6 +91,7 @@ class Store {
String? macosApplicationGroup})
: _weak = false,
_queriesCaseSensitiveDefault = queriesCaseSensitiveDefault,
directoryPath = _safeDirectoryPath(directory),
_absoluteDirectoryPath =
path.context.canonicalize(_safeDirectoryPath(directory)) {
try {
Expand All @@ -114,13 +118,11 @@ class Store {

try {
checkObx(C.opt_model(opt, model.ptr));
if (directory != null && directory.isNotEmpty) {
final cStr = directory.toNativeUtf8();
try {
checkObx(C.opt_directory(opt, cStr.cast()));
} finally {
malloc.free(cStr);
}
final cStr = directoryPath.toNativeUtf8();
try {
checkObx(C.opt_directory(opt, cStr.cast()));
} finally {
malloc.free(cStr);
}
if (maxDBSizeInKB != null && maxDBSizeInKB > 0) {
C.opt_max_db_size_in_kb(opt, maxDBSizeInKB);
Expand Down Expand Up @@ -199,6 +201,7 @@ class Store {
{bool queriesCaseSensitiveDefault = true})
// must not close the same native store twice so [_weak]=true
: _weak = true,
directoryPath = '',
_absoluteDirectoryPath = '',
_queriesCaseSensitiveDefault = queriesCaseSensitiveDefault {
// see [reference] for serialization order
Expand Down Expand Up @@ -231,6 +234,7 @@ class Store {
// _weak = false so store can be closed.
: _weak = false,
_queriesCaseSensitiveDefault = queriesCaseSensitiveDefault,
directoryPath = _safeDirectoryPath(directoryPath),
_absoluteDirectoryPath =
path.context.canonicalize(_safeDirectoryPath(directoryPath)) {
try {
Expand All @@ -240,12 +244,12 @@ class Store {
// overlap.
_checkStoreDirectoryNotOpen();

final path = _safeDirectoryPath(directoryPath);
final pathCStr = path.toNativeUtf8();
final pathCStr = this.directoryPath.toNativeUtf8();
try {
if (debugLogs) {
final isOpen = C.store_is_open(pathCStr.cast());
print('Attaching to store... path=$path isOpen=$isOpen');
print(
'Attaching to store... path=${this.directoryPath} isOpen=$isOpen');
}
_cStore = C.store_attach(pathCStr.cast());
} finally {
Expand Down Expand Up @@ -378,6 +382,45 @@ class Store {
return _runInTransaction(mode, (tx) => fn());
}

// Isolate entry point must be static or top-level.
static Future<void> _callFunctionWithStoreInIsolate<P, R>(
_IsoPass<P, R> isoPass) async {
final store = Store.attach(isoPass.model, isoPass.dbDirectoryPath,
queriesCaseSensitiveDefault: isoPass.queriesCaseSensitiveDefault);
final result = await isoPass.runFn(store);
store.close();
// Note: maybe replace with Isolate.exit (and remove kill call in
// runIsolated) once min Dart SDK 2.15.
isoPass.resultPort?.send(result);
}

/// Spawns an isolate, runs [callback] in that isolate passing it [param] with
/// its own Store and returns the result of callback.
///
/// Instances of [callback] must be top-level functions or static methods
/// of classes, not closures or instance methods of objects.
///
/// Note: this requires Dart 2.15.0 or newer
/// (shipped with Flutter 2.8.0 or newer).
Future<R> runIsolated<P, R>(
TxMode mode, FutureOr<R> Function(Store, P) callback, P param) async {
final resultPort = ReceivePort();
// Await isolate spawn to avoid waiting forever if it fails to spawn.
final isolate = await Isolate.spawn(
_callFunctionWithStoreInIsolate,
_IsoPass(_defs, directoryPath, _queriesCaseSensitiveDefault,
resultPort.sendPort, callback, param));
// Use Completer to return result so type is not lost.
final result = Completer<R>();
resultPort.listen((dynamic message) {
result.complete(message as R);
});
await result.future;
resultPort.close();
isolate.kill();
return result.future;
}

/// Internal only - bypasses the main checks for async functions, you may
/// only pass synchronous callbacks!
R _runInTransaction<R>(TxMode mode, R Function(Transaction) fn) {
Expand Down Expand Up @@ -491,3 +534,38 @@ final _openStoreDirectories = HashSet<String>();
/// Otherwise, it's we can distinguish at runtime whether a function is async.
final _nullSafetyEnabled = _nullReturningFn is! Future Function();
final _nullReturningFn = () => null;

/// Captures everything required to create a "copy" of a store in an isolate
/// and run user code.
@immutable
class _IsoPass<P, R> {
final ModelDefinition model;

/// Used to attach to store in separate isolate
/// (may be replaced in the future).
final String dbDirectoryPath;

final bool queriesCaseSensitiveDefault;

/// Non-void functions can use this port to receive the result.
final SendPort? resultPort;

/// Parameter passed to [callback].
final P param;

/// To be called in isolate.
final FutureOr<R> Function(Store, P) callback;

const _IsoPass(
this.model,
this.dbDirectoryPath,
// ignore: avoid_positional_boolean_parameters
this.queriesCaseSensitiveDefault,
this.resultPort,
this.callback,
this.param);

/// Calls [callback] inside this class so types are not lost
/// (if called in isolate types would be dynamic instead of P and R).
FutureOr<R> runFn(Store store) => callback(store, param);
}
40 changes: 40 additions & 0 deletions objectbox/test/basics_test.dart
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import 'dart:async';
import 'dart:ffi' as ffi;
import 'dart:io';
import 'dart:isolate';

import 'package:async/async.dart';
import 'package:meta/meta.dart';
import 'package:objectbox/internal.dart';
import 'package:objectbox/src/native/bindings/bindings.dart';
import 'package:objectbox/src/native/bindings/helpers.dart';
Expand Down Expand Up @@ -192,6 +194,44 @@ void main() {
store.close();
Directory('basics').deleteSync(recursive: true);
});

test('store_runInIsolatedTx', () async {
final env = TestEnv('basics');
final id = env.box.put(TestEntity(tString: 'foo'));
final futureResult =
env.store.runIsolated(TxMode.write, readStringAndRemove, id);
print('Count in main isolate: ${env.box.count()}');
final String x;
try {
x = await futureResult;
} catch (e) {
final dartVersion = RegExp('([0-9]+).([0-9]+).([0-9]+)')
.firstMatch(Platform.version)
?.group(0);
if (dartVersion != null && dartVersion.compareTo('2.15.0') < 0) {
print('runIsolated requires Dart 2.15, ignoring error.');
env.closeAndDelete();
return;
} else {
rethrow;
}
}
expect(x, 'foo!');
expect(env.box.count(), 0); // Must be removed once awaited
env.closeAndDelete();
});
}

Future<String> readStringAndRemove(Store store, int id) async {
var box = store.box<TestEntity>();
var testEntity = box.get(id);
final result = testEntity!.tString! + '!';
print('Result in 2nd isolate: $result');
final removed = box.remove(id);
print('Removed in 2nd isolate: $removed');
print('Count in 2nd isolate after remove: ${box.count()}');
// Pointless Future to test async functions are supported.
return await Future.delayed(const Duration(milliseconds: 10), () => result);
}

class StoreAttachIsolateInit {
Expand Down