From b9583998e63e0ee6490a911f9fcb8291ec1b4ab6 Mon Sep 17 00:00:00 2001 From: Mark Logan Date: Wed, 15 Jul 2015 15:44:25 -0700 Subject: [PATCH 1/4] True serialization of TypedArrays. Serializes TypeArrays and their backing ArrayBuffers, and deserializes them such that the resulting object is also a typed array. The deserialized array is constructed with a backing ArrayBuffer of the same size and contents as the serialized array, which is how typed array serialization works in Chrome/Blink webworkers. --- src/bson.cc | 115 +++++++++++++++++++++++++++++++++++- src/bson.h | 17 ++++++ test/test30_postmessages.js | 57 ++++++++++++++++++ 3 files changed, 188 insertions(+), 1 deletion(-) create mode 100644 test/test30_postmessages.js diff --git a/src/bson.cc b/src/bson.cc index c8d8974..02b07db 100644 --- a/src/bson.cc +++ b/src/bson.cc @@ -135,6 +135,62 @@ template void BSONSerializer::SerializeArray(const Handle& this->CommitSize(documentSize); } +TypedArrayType getTypedArrayType(Local value) { + if (value->IsUint8Array()) { return TYPED_ARRAY_UINT8; } + else if (value->IsUint8ClampedArray()) { return TYPED_ARRAY_UINT8_CLAMPED; } + else if (value->IsInt8Array()) { return TYPED_ARRAY_INT8; } + else if (value->IsUint16Array()) { return TYPED_ARRAY_UINT16; } + else if (value->IsInt16Array()) { return TYPED_ARRAY_INT16; } + else if (value->IsUint32Array()) { return TYPED_ARRAY_UINT32; } + else if (value->IsInt32Array()) { return TYPED_ARRAY_INT32; } + else if (value->IsFloat32Array()) { return TYPED_ARRAY_FLOAT32; } + else if (value->IsFloat64Array()) { return TYPED_ARRAY_FLOAT64; } + else { + ThrowAllocatedStringException(64, "Unknown typed array sub-type"); + } + return static_cast(0); // unreachable. +} + +int getTypedArrayElementBytesize(TypedArrayType type) +{ + switch (type) { + case TYPED_ARRAY_INT8: + case TYPED_ARRAY_UINT8: + case TYPED_ARRAY_UINT8_CLAMPED: + return 1; + + case TYPED_ARRAY_INT16: + case TYPED_ARRAY_UINT16: + return 2; + + case TYPED_ARRAY_INT32: + case TYPED_ARRAY_UINT32: + case TYPED_ARRAY_FLOAT32: + return 4; + + case TYPED_ARRAY_FLOAT64: + return 8; + } + ThrowAllocatedStringException(64, "Unhandled type array type %d", type); +} + +unsigned char* arrayBufferData(Local buffer) { + // Using buffer.ByteLength isn't strictly necessary here - we could still get a pointer to the + // beginning of the array by making a 1-element array. + Local fullBufferView = Uint8Array::New(buffer, 0, buffer->ByteLength()); + // GetIndexedPropertiesExternalArrayData only works when the value is treated as an Object. + Local bufferViewAsObject = fullBufferView->ToObject(); + unsigned char* p = static_cast(bufferViewAsObject->GetIndexedPropertiesExternalArrayData()); + if (p == NULL) { + // See: https://groups.google.com/forum/#!searchin/v8-users/ArrayBuffer/v8-users/iiZr67iAfU0/nbmc9Lo23BYJ + // GetIndexedPropertiesExternalArrayData seems to work so far, but there are reports that + // it sometimes returns null. From reading the v8 source its not clear how a typed array + // could ever return null, but we should keep an eye on this. + ThrowAllocatedStringException(64, "GetIndexedPropertiesExternalArrayData() returned NULL"); + } + return p; +} + // This is templated so that we can use this function to both count the number of bytes, and to serialize those bytes. // The template approach eliminates almost all of the inspection of values unless they're required (eg. string lengths) // and ensures that there is always consistency between bytes counted and bytes written by design. @@ -302,8 +358,35 @@ template void BSONSerializer::SerializeValue(void* typeLocation, this->CommitType(typeLocation, BSON_TYPE_MAX_KEY); } } + else if (value->IsArrayBufferView()) { + if (!value->IsTypedArray()) { + ThrowAllocatedStringException(128, "ArrayBufferViews other than Typed Arrays not supported"); + } + TypedArrayType arrayType = getTypedArrayType(value); + + // We serialize the entire ArrayBuffer, even if the array is just a small view into + // a larger buffer. This is how webworkers in chrome behave. + ArrayBufferView* bufferView = ArrayBufferView::Cast(*value); + Local buffer = bufferView->Buffer(); + + size_t viewLength = bufferView->ByteLength(); + size_t viewOffset = bufferView->ByteOffset(); + size_t bufferLength = buffer->ByteLength(); + + unsigned char* p = arrayBufferData(buffer); + if (p == NULL) { return; } + + this->CommitType(typeLocation, BSON_TYPE_TYPED_ARRAY); + // If you change the order of writes here, be sure to update the order of the corresponding + // reads in DeserializeValue. + this->WriteByte(arrayType); + this->WriteInt64(viewLength); + this->WriteInt64(viewOffset); + this->WriteInt64(bufferLength); + for (size_t i = 0; i < bufferLength; i++) { this->WriteByte(p[i]); } + } else if(Buffer::HasInstance(value)) - { + { this->CommitType(typeLocation, BSON_TYPE_BINARY); #if NODE_MAJOR_VERSION == 0 && NODE_MINOR_VERSION < 3 @@ -588,6 +671,36 @@ Handle BSONDeserializer::DeserializeValue(BsonType type, bool promoteLong case BSON_TYPE_MAX_KEY: return NanNew(bson->maxKeyConstructor)->NewInstance(); + case BSON_TYPE_TYPED_ARRAY: + { + TypedArrayType arrayType = static_cast(ReadByte()); + size_t viewLength = ReadInt64(); + size_t viewOffset = ReadInt64(); + size_t bufferLength = ReadInt64(); + + int elementCount = viewLength / getTypedArrayElementBytesize(arrayType); + Local buffer = ArrayBuffer::New(Isolate::GetCurrent(), bufferLength); + + unsigned char* p = arrayBufferData(buffer); + if (p == NULL) { return NanNull(); } + + for (size_t i = 0; i < bufferLength; i++) { p[i] = ReadByte(); } + + switch (arrayType) { + case TYPED_ARRAY_INT8: return Int8Array::New(buffer, viewOffset, elementCount); + case TYPED_ARRAY_UINT8: return Uint8Array::New(buffer, viewOffset, elementCount); + case TYPED_ARRAY_UINT8_CLAMPED: return Uint8ClampedArray::New(buffer, viewOffset, elementCount); + case TYPED_ARRAY_INT16: return Int16Array::New(buffer, viewOffset, elementCount); + case TYPED_ARRAY_UINT16: return Uint16Array::New(buffer, viewOffset, elementCount); + case TYPED_ARRAY_INT32: return Int32Array::New(buffer, viewOffset, elementCount); + case TYPED_ARRAY_UINT32: return Uint32Array::New(buffer, viewOffset, elementCount); + case TYPED_ARRAY_FLOAT32: return Float32Array::New(buffer, viewOffset, elementCount); + case TYPED_ARRAY_FLOAT64: return Float64Array::New(buffer, viewOffset, elementCount); + } + ThrowAllocatedStringException(64, "Unhandled TypedArrayType arrayType: %d", arrayType); + return NanNull();; + } + default: ThrowAllocatedStringException(64, "Unhandled BSON Type: %d", type); } diff --git a/src/bson.h b/src/bson.h index 3cfc2bd..5121cbe 100644 --- a/src/bson.h +++ b/src/bson.h @@ -35,10 +35,27 @@ enum BsonType BSON_TYPE_INT = 16, BSON_TYPE_TIMESTAMP = 17, BSON_TYPE_LONG = 18, + BSON_TYPE_TYPED_ARRAY = 19, BSON_TYPE_MAX_KEY = 0x7f, BSON_TYPE_MIN_KEY = 0xff }; +enum TypedArrayType +{ + TYPED_ARRAY_INT8 = 1, + TYPED_ARRAY_UINT8 = 2, + TYPED_ARRAY_UINT8_CLAMPED = 3, + + TYPED_ARRAY_INT16 = 4, + TYPED_ARRAY_UINT16 = 5, + + TYPED_ARRAY_INT32 = 6, + TYPED_ARRAY_UINT32 = 7, + + TYPED_ARRAY_FLOAT32 = 8, + TYPED_ARRAY_FLOAT64 = 9, +}; + //=========================================================================== template class BSONSerializer; diff --git a/test/test30_postmessages.js b/test/test30_postmessages.js new file mode 100644 index 0000000..1403889 --- /dev/null +++ b/test/test30_postmessages.js @@ -0,0 +1,57 @@ + + +var t= require('../'); + +var worker = new t.Worker(function() { + this.onmessage = function(e) { + postMessage(e.data); + } +}); + +var errors = []; +worker.onmessage = function(event) { + if (event.data.terminate) { + for (var i = 0; i < errors.length; i++) { + console.log('ERROR ' + errors[i]); + } + if (errors.length == 0) { + console.log("All tests OK"); + } + worker.terminate(); + return; + } + var array = event.data.array; + if (!(array instanceof Uint8Array)) { + errors.push('ERROR: event.data.array must be Uint8Array'); + } + if (event.data.test == 'regular') { + for (var i = 0; i < array.length; i++) { + if (array[i] != i) { + errors.push('ERROR: incorrect data at ' + i); + return; + } + } + console.log('regular deserialization OK'); + } + if (event.data.test == 'subarray') { + for (var i = 0; i < array.length; i++) { + if (array[i] != i + 16) { + errors.push('ERROR: incorrect data at ' + i); + return; + } + } + console.log('sub-array deserialization OK'); + } +}; + +// Test a regular array. +var a = new Uint8Array(64); +for (var i = 0; i < 64; i++) { a[i] = i; } +worker.postMessage({ array: a, test: 'regular'}, [a.buffer]); + +// Test an array which is a view into a larger buffer. +var b = a.subarray(16, 32); +worker.postMessage({ array: b, test: 'subarray'}); + +worker.postMessage({ terminate: 1 }); + From 2b2d329325a335c167c5900f03c894103fd6e3c2 Mon Sep 17 00:00:00 2001 From: Mark Logan Date: Thu, 16 Jul 2015 17:44:07 -0700 Subject: [PATCH 2/4] Replace POST_EVENT macro with function --- src/WebWorkerThreads.cc | 79 +++++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 39 deletions(-) diff --git a/src/WebWorkerThreads.cc b/src/WebWorkerThreads.cc index 334a720..21f9a84 100644 --- a/src/WebWorkerThreads.cc +++ b/src/WebWorkerThreads.cc @@ -798,51 +798,52 @@ NAN_METHOD(processEmitSerialized) { NanReturnValue(args.This()); } -#define POST_EVENT(eventname) { \ - NanScope(); \ - int len = args.Length(); \ - \ - if (!len) NanReturnValue(args.This()); \ - \ - typeThread* thread= (typeThread*) NanGetIsolateData(Isolate::GetCurrent()); \ - \ - typeQueueItem* qitem= nuJobQueueItem(); \ - typeJob* job= (typeJob*) qitem->asPtr; \ - \ - job->jobType= kJobTypeEventSerialized; \ - job->typeEventSerialized.eventName= new String::Utf8Value(NanNew(eventname)); \ - job->typeEventSerialized.length= len; \ - \ - Local array= NanNew(len); \ - int i = 0; do { array->Set(i, args[i]); } while (++i < len); \ - \ - { \ - char* buffer; \ - BSON *bson = new BSON(); \ - size_t object_size; \ - Local object = bson->GetSerializeObject(array); \ - BSONSerializer counter(bson, false, false); \ - counter.SerializeDocument(object); \ - object_size = counter.GetSerializeSize(); \ - buffer = (char *)malloc(object_size); \ - BSONSerializer data(bson, false, false, buffer); \ - data.SerializeDocument(object); \ - job->typeEventSerialized.buffer= buffer; \ - job->typeEventSerialized.bufferSize= object_size; \ - } \ - \ - queue_push(qitem, &thread->outQueue); \ - if (!(thread->inQueue.length)) uv_async_send(&thread->async_watcher); \ - \ - NanReturnValue(args.This()); \ +template +void postEvent(ArgType& args, const char* eventname) { + NanScope(); + int len = args.Length(); + + if (!len) NanReturnValue(args.This()); + + typeThread* thread= (typeThread*) NanGetIsolateData(Isolate::GetCurrent()); + + typeQueueItem* qitem= nuJobQueueItem(); + typeJob* job= (typeJob*) qitem->asPtr; + + job->jobType= kJobTypeEventSerialized; + job->typeEventSerialized.eventName= new String::Utf8Value(NanNew(eventname)); + job->typeEventSerialized.length= len; + + Local array= NanNew(len); + int i = 0; do { array->Set(i, args[i]); } while (++i < len); + + { + char* buffer; + BSON *bson = new BSON(); + size_t object_size; + Local object = bson->GetSerializeObject(array); + BSONSerializer counter(bson, false, false); + counter.SerializeDocument(object); + object_size = counter.GetSerializeSize(); + buffer = (char *)malloc(object_size); + BSONSerializer data(bson, false, false, buffer); + data.SerializeDocument(object); + job->typeEventSerialized.buffer= buffer; + job->typeEventSerialized.bufferSize= object_size; + } + + queue_push(qitem, &thread->outQueue); + if (!(thread->inQueue.length)) uv_async_send(&thread->async_watcher); + + NanReturnValue(args.This()); } NAN_METHOD(postMessage) { - POST_EVENT("message"); + postEvent(args, "message"); } NAN_METHOD(postError) { - POST_EVENT("error"); + postEvent(args, "error"); } NAN_METHOD(threadEmit) { From cb004939f0ff3a075e014182182e550297b89314 Mon Sep 17 00:00:00 2001 From: Mark Logan Date: Thu, 16 Jul 2015 17:40:12 -0700 Subject: [PATCH 3/4] Skeleton of postMessage transferrable support. Garbage collection of transferred arrays is not yet working. --- src/WebWorkerThreads.cc | 54 +++++++++++++-- src/bson.cc | 133 ++++++++++++++++++++++++++++++++---- src/bson.h | 26 ++++++- src/worker.js | 8 ++- src/worker.ls | 7 +- test/test30_postmessages.js | 24 +++++-- 6 files changed, 222 insertions(+), 30 deletions(-) diff --git a/src/WebWorkerThreads.cc b/src/WebWorkerThreads.cc index 21f9a84..ad607d9 100644 --- a/src/WebWorkerThreads.cc +++ b/src/WebWorkerThreads.cc @@ -757,6 +757,27 @@ NAN_METHOD(processEmit) { NanReturnValue(args.This()); } +static bool populateTransferables(Local transferablesArg, std::vector >& transferables) +{ + if (!transferablesArg->IsArray()) { return false; } + + Array* array = Array::Cast(*transferablesArg); + + transferables.empty(); + transferables.reserve(array->Length()); + for (unsigned int j = 0; j < array->Length(); j++) { + Local element = array->Get(j); + if (!element->IsArrayBuffer()) { + NanThrowError( + "second argument of post{Message,Error} must be an array of ArrayBuffer objects." + ); + return false; + } + transferables.push_back(Local::Cast(element)); + } + return true; +} + NAN_METHOD(processEmitSerialized) { NanScope(); @@ -775,19 +796,27 @@ NAN_METHOD(processEmitSerialized) { job->jobType= kJobTypeEventSerialized; job->typeEventSerialized.length= len-1; job->typeEventSerialized.eventName= new String::Utf8Value(args[0]); + + std::vector > transferables; + if (!args[1]->IsNull() && !populateTransferables(args[1], transferables)) + { + NanReturnValue(args.This()); + return; + } + Local array= NanNew(len-1); - int i = 1; do { array->Set(i-1, args[i]); } while (++i < len); + int i = 2; do { array->Set(i-2, args[i]); } while (++i < len); { char* buffer; BSON *bson = new BSON(); size_t object_size; Local object = bson->GetSerializeObject(array); - BSONSerializer counter(bson, false, false); + BSONSerializer counter(bson, false, false, &transferables); counter.SerializeDocument(object); object_size = counter.GetSerializeSize(); buffer = (char *)malloc(object_size); - BSONSerializer data(bson, false, false, buffer); + BSONSerializer data(bson, false, false, buffer, &transferables); data.SerializeDocument(object); job->typeEventSerialized.buffer= buffer; job->typeEventSerialized.bufferSize= object_size; @@ -805,7 +834,20 @@ void postEvent(ArgType& args, const char* eventname) { if (!len) NanReturnValue(args.This()); - typeThread* thread= (typeThread*) NanGetIsolateData(Isolate::GetCurrent()); + if (len > 2) { + NanThrowError("post{Message,Error} takes at most 2 arguments."); + NanReturnValue(args.This()); + } + std::vector > transferables; + if (len == 2) + { + if (!populateTransferables(args[1], transferables)) + { + NanReturnValue(args.This()); + } + } + + typeThread* thread= (typeThread*) NanGetIsolateData(args.GetIsolate()); typeQueueItem* qitem= nuJobQueueItem(); typeJob* job= (typeJob*) qitem->asPtr; @@ -822,11 +864,11 @@ void postEvent(ArgType& args, const char* eventname) { BSON *bson = new BSON(); size_t object_size; Local object = bson->GetSerializeObject(array); - BSONSerializer counter(bson, false, false); + BSONSerializer counter(bson, false, false, &transferables); counter.SerializeDocument(object); object_size = counter.GetSerializeSize(); buffer = (char *)malloc(object_size); - BSONSerializer data(bson, false, false, buffer); + BSONSerializer data(bson, false, false, buffer, &transferables); data.SerializeDocument(object); job->typeEventSerialized.buffer= buffer; job->typeEventSerialized.bufferSize= object_size; diff --git a/src/bson.cc b/src/bson.cc index 02b07db..f023321 100644 --- a/src/bson.cc +++ b/src/bson.cc @@ -191,6 +191,41 @@ unsigned char* arrayBufferData(Local buffer) { return p; } +template +int BSONSerializer::TransferableIndex(Local buffer) { + if (transferables != NULL) + { + for (size_t i = 0; i < transferables->size(); i++) + { + if (buffer->StrictEquals((*transferables)[i])) { return i; } + } + } + return -1; +} + +template +struct Externalizer; + +template <> +struct Externalizer { + static unsigned char* externalize(Local buffer, int bufferLength) + { + // We don't actually externalize the buffers until the DataStream step runs. + return NULL; + } +}; + +template <> +struct Externalizer { + static unsigned char* externalize(Local buffer, int bufferLength) + { + ArrayBuffer::Contents contents = buffer->Externalize(); + buffer->Neuter(); + return static_cast(contents.Data()); + } +}; + + // This is templated so that we can use this function to both count the number of bytes, and to serialize those bytes. // The template approach eliminates almost all of the inspection of values unless they're required (eg. string lengths) // and ensures that there is always consistency between bytes counted and bytes written by design. @@ -364,8 +399,8 @@ template void BSONSerializer::SerializeValue(void* typeLocation, } TypedArrayType arrayType = getTypedArrayType(value); - // We serialize the entire ArrayBuffer, even if the array is just a small view into - // a larger buffer. This is how webworkers in chrome behave. + // When an ArrayBuffer has not been specified as a transferable, we serialize the entire ArrayBuffer, + // even if the array is just a small view into a larger buffer. This is how webworkers in chrome behave. ArrayBufferView* bufferView = ArrayBufferView::Cast(*value); Local buffer = bufferView->Buffer(); @@ -373,7 +408,26 @@ template void BSONSerializer::SerializeValue(void* typeLocation, size_t viewOffset = bufferView->ByteOffset(); size_t bufferLength = buffer->ByteLength(); - unsigned char* p = arrayBufferData(buffer); + int transferableIdx = TransferableIndex(buffer); + bool transferable = transferableIdx >= 0; + unsigned char* p = NULL; + if (transferable) + { + if (buffer->IsExternal()) + { + p = arrayBufferData(buffer); + } + else + { + p = Externalizer::externalize(buffer, bufferLength); + } + transferableContents[transferableIdx] = p; + } + if (p == NULL) + { + p = arrayBufferData(buffer); + } + if (p == NULL) { return; } this->CommitType(typeLocation, BSON_TYPE_TYPED_ARRAY); @@ -383,7 +437,17 @@ template void BSONSerializer::SerializeValue(void* typeLocation, this->WriteInt64(viewLength); this->WriteInt64(viewOffset); this->WriteInt64(bufferLength); - for (size_t i = 0; i < bufferLength; i++) { this->WriteByte(p[i]); } + if (transferable) + { + this->WritePointer(p); + } + else + { + // The deserializer uses NULL as the signal to read in the ArrayBuffer contents from + // the data stream. + this->WritePointer(NULL); + for (size_t i = 0; i < bufferLength; i++) { this->WriteByte(p[i]); } + } } else if(Buffer::HasInstance(value)) { @@ -550,6 +614,33 @@ Handle BSONDeserializer::DeserializeArrayInternal(bool promoteLongs) return returnArray; } +class ArrayGCInfo { + public: + ArrayGCInfo(Isolate* i, Local buffer, unsigned char* _p) + : pHandle(new Persistent(i, buffer)), p(reinterpret_cast(_p)) + { + pHandle->SetWeak(this, &ArrayGCInfo::WeakCallback); + } + + ~ArrayGCInfo() + { + // XXX This never runs, and I haven't been able to figure out why yet. + printf("XXXXX Cleaning up ArrayBuffer! XXXXXXX\n"); + delete pHandle; + delete[] p; + pHandle = NULL; + p = NULL; + } + + static void WeakCallback(const WeakCallbackData& data) { + ArrayGCInfo* info = data.GetParameter(); + delete info; + } + private: + Persistent* pHandle; + char* p; +}; + Handle BSONDeserializer::DeserializeValue(BsonType type, bool promoteLongs) { switch(type) @@ -677,14 +768,30 @@ Handle BSONDeserializer::DeserializeValue(BsonType type, bool promoteLong size_t viewLength = ReadInt64(); size_t viewOffset = ReadInt64(); size_t bufferLength = ReadInt64(); + unsigned char* pointer = static_cast(ReadPointer()); int elementCount = viewLength / getTypedArrayElementBytesize(arrayType); - Local buffer = ArrayBuffer::New(Isolate::GetCurrent(), bufferLength); - unsigned char* p = arrayBufferData(buffer); - if (p == NULL) { return NanNull(); } - - for (size_t i = 0; i < bufferLength; i++) { p[i] = ReadByte(); } + Isolate* i = Isolate::GetCurrent(); + Local buffer; + if (pointer == NULL) + { + // ArrayBuffer was not transferred, must allocate new one. + buffer = ArrayBuffer::New(i, bufferLength); + unsigned char* p = arrayBufferData(buffer); + if (p == NULL) { return NanNull(); } + for (size_t i = 0; i < bufferLength; i++) { p[i] = ReadByte(); } + } + else + { + // Create buffer from existing memory. + buffer = ArrayBuffer::New(i, pointer, bufferLength); + // Create Persistent handle with WeakCallback to free memory after buffer is GCed. + // XXX: If this ArrayBuffer is later transferred out of this thread, we must cancel + // this weak callback or we'll get a double delete. + // (That is, if the weak callback ever ran, which it currently doesn't) + new ArrayGCInfo(Isolate::GetCurrent(), buffer, pointer); + } switch (arrayType) { case TYPED_ARRAY_INT8: return Int8Array::New(buffer, viewOffset, elementCount); @@ -978,7 +1085,7 @@ NAN_METHOD(BSON::BSONSerialize) { Local object = bson->GetSerializeObject(args[0]); - BSONSerializer counter(bson, false, serializeFunctions); + BSONSerializer counter(bson, false, serializeFunctions, NULL); counter.SerializeDocument(object); object_size = counter.GetSerializeSize(); @@ -987,7 +1094,7 @@ NAN_METHOD(BSON::BSONSerialize) // Check if we have a boolean value bool checkKeys = args.Length() >= 3 && args[1]->IsBoolean() && args[1]->BooleanValue(); - BSONSerializer data(bson, checkKeys, serializeFunctions, serialized_object); + BSONSerializer data(bson, checkKeys, serializeFunctions, serialized_object, NULL); data.SerializeDocument(object); } catch(char *err_msg) @@ -1024,7 +1131,7 @@ NAN_METHOD(BSON::CalculateObjectSize) // Unpack the BSON parser instance BSON *bson = ObjectWrap::Unwrap(args.This()); bool serializeFunctions = (args.Length() >= 2) && args[1]->BooleanValue(); - BSONSerializer countSerializer(bson, false, serializeFunctions); + BSONSerializer countSerializer(bson, false, serializeFunctions, NULL); countSerializer.SerializeDocument(args[0]); // Return the object size @@ -1056,7 +1163,7 @@ NAN_METHOD(BSON::SerializeWithBufferAndIndex) bool checkKeys = args.Length() >= 4 && args[1]->IsBoolean() && args[1]->BooleanValue(); bool serializeFunctions = (args.Length() == 5) && args[4]->BooleanValue(); - BSONSerializer dataSerializer(bson, checkKeys, serializeFunctions, data+index); + BSONSerializer dataSerializer(bson, checkKeys, serializeFunctions, data+index, NULL); dataSerializer.SerializeDocument(bson->GetSerializeObject(args[0])); object_size = dataSerializer.GetSerializeSize(); diff --git a/src/bson.h b/src/bson.h index 5121cbe..1966036 100644 --- a/src/bson.h +++ b/src/bson.h @@ -36,6 +36,7 @@ enum BsonType BSON_TYPE_TIMESTAMP = 17, BSON_TYPE_LONG = 18, BSON_TYPE_TYPED_ARRAY = 19, + BSON_TYPE_TYPED_ARRAY_TRANSFERABLE = 20, BSON_TYPE_MAX_KEY = 0x7f, BSON_TYPE_MIN_KEY = 0xff }; @@ -158,6 +159,7 @@ class CountStream void WriteObjectId(const Handle& object, const Handle& key) { count += 12; } void WriteString(const Local& value) { count += value->Utf8Length() + 1; } // This returns the number of bytes exclusive of the NULL terminator void WriteData(const char* data, size_t length) { count += length; } + void WritePointer(void*) { count += sizeof(void*); } void* BeginWriteType() { ++count; return NULL; } void CommitType(void*, BsonType) { } @@ -184,10 +186,12 @@ class DataStream void WriteInt32(int32_t value) { *reinterpret_cast(p) = value; p += 4; } void WriteInt64(int64_t value) { *reinterpret_cast(p) = value; p += 8; } void WriteDouble(double value) { *reinterpret_cast(p) = value; p += 8; } + void WritePointer(void* value) { *reinterpret_cast(p) = value; p += sizeof(void*); } #else void WriteInt32(int32_t value) { memcpy(p, &value, 4); p += 4; } void WriteInt64(int64_t value) { memcpy(p, &value, 8); p += 8; } void WriteDouble(double value) { memcpy(p, &value, 8); p += 8; } + void WritePointer(void* value) { memcpy(p, &value, sizeof(void*)); p += sizeof(void*); } #endif void WriteBool(const Handle& value) { WriteByte(value->BooleanValue() ? 1 : 0); } void WriteInt32(const Handle& value) { WriteInt32(value->Int32Value()); } @@ -226,17 +230,33 @@ template class BSONSerializer : public T typedef T Inherited; public: - BSONSerializer(BSON* aBson, bool aCheckKeys, bool aSerializeFunctions) : Inherited(), checkKeys(aCheckKeys), serializeFunctions(aSerializeFunctions), bson(aBson) { } - BSONSerializer(BSON* aBson, bool aCheckKeys, bool aSerializeFunctions, char* parentParam) : Inherited(parentParam), checkKeys(aCheckKeys), serializeFunctions(aSerializeFunctions), bson(aBson) { } + BSONSerializer(BSON* aBson, bool aCheckKeys, bool aSerializeFunctions, const std::vector >* aTransferables) + : Inherited(), checkKeys(aCheckKeys), serializeFunctions(aSerializeFunctions), bson(aBson), transferables(aTransferables) + { + if (transferables != NULL) { transferableContents.resize(transferables->size(), NULL); } + } + BSONSerializer(BSON* aBson, bool aCheckKeys, bool aSerializeFunctions, char* parentParam, const std::vector >* aTransferables) + : Inherited(parentParam), checkKeys(aCheckKeys), serializeFunctions(aSerializeFunctions), bson(aBson), transferables(aTransferables) + { + if (transferables != NULL) { transferableContents.resize(transferables->size(), NULL); } + } void SerializeDocument(const Handle& value); void SerializeArray(const Handle& value); void SerializeValue(void* typeLocation, const Handle& value); private: + int TransferableIndex(Local buffer); + bool checkKeys; bool serializeFunctions; BSON* bson; + const std::vector >* transferables; + + // Parallels transferables. Each entry stores the pointer returned by ArrayBuffer::Externalize + // (if that buffer has already been externalized during serialization) or a NULL pointer if the + // corresponding buffer has not yet been externalized. + std::vector transferableContents; }; //=========================================================================== @@ -262,11 +282,13 @@ class BSONDeserializer uint32_t ReadUInt32() { uint32_t returnValue = *reinterpret_cast(p); p += 4; return returnValue; } int64_t ReadInt64() { int64_t returnValue = *reinterpret_cast(p); p += 8; return returnValue; } double ReadDouble() { double returnValue = *reinterpret_cast(p); p += 8; return returnValue; } + void* ReadPointer() { void* returnValue = *reinterpret_cast(p); p += sizeof(void*); return returnValue; } #else int32_t ReadInt32() { int32_t returnValue; memcpy(&returnValue, p, 4); p += 4; return returnValue; } uint32_t ReadUInt32() { uint32_t returnValue; memcpy(&returnValue, p, 4); p += 4; return returnValue; } int64_t ReadInt64() { int64_t returnValue; memcpy(&returnValue, p, 8); p += 8; return returnValue; } double ReadDouble() { double returnValue; memcpy(&returnValue, p, 8); p += 8; return returnValue; } + void* ReadPointer() { void* returnValue; memcpy(&returnValue, p, sizeof(void*)); p += sizeof(void*); return returnValue; } #endif size_t GetSerializeSize() const { return p - pStart; } diff --git a/src/worker.js b/src/worker.js index 004bb54..ab3f593 100644 --- a/src/worker.js +++ b/src/worker.js @@ -28,10 +28,12 @@ function Worker(){ } }; this.dispatchEvent = function(event){ - return t.emitSerialized(event.type, event); + return t.emitSerialized(event.type, null, event); }; - this.postMessage = function(data){ - return t.emitSerialized('message', { + this.postMessage = function(data, transferables){ + console.log('hay'); + if (transferables == null) { transferables = null; } + return t.emitSerialized('message', transferables, { data: data }); }; diff --git a/src/worker.ls b/src/worker.ls index 4570bbb..35989fb 100644 --- a/src/worker.ls +++ b/src/worker.ls @@ -10,8 +10,11 @@ function Worker () => Threads = this; class @onmessage = cb else t.on event, cb - @dispatch-event = (event) -> t.emitSerialized event.type, event - @post-message = (data) -> t.emitSerialized \message {data} + @dispatch-event = (event) -> t.emitSerialized event.type, null, event + @post-message = (data, transferables) -> + if not transferables? + transferables = null + t.emitSerialized \message, transferables, {data} if typeof code is \function t.eval "(#code)()" else if code? diff --git a/test/test30_postmessages.js b/test/test30_postmessages.js index 1403889..0afc009 100644 --- a/test/test30_postmessages.js +++ b/test/test30_postmessages.js @@ -1,10 +1,14 @@ - +// run with node --expose_gc. var t= require('../'); var worker = new t.Worker(function() { this.onmessage = function(e) { - postMessage(e.data); + if (e.data.array) { + postMessage({ array: e.data.array, test: e.data.test }, [e.data.array.buffer]); + } else { + postMessage(e.data); + } } }); @@ -44,14 +48,26 @@ worker.onmessage = function(event) { } }; +worker.onerror = function (event) { + console.log('error', event); +} + // Test a regular array. var a = new Uint8Array(64); for (var i = 0; i < 64; i++) { a[i] = i; } -worker.postMessage({ array: a, test: 'regular'}, [a.buffer]); +worker.postMessage({ array: a, test: 'regular'}); // Test an array which is a view into a larger buffer. var b = a.subarray(16, 32); worker.postMessage({ array: b, test: 'subarray'}); -worker.postMessage({ terminate: 1 }); +// Test transferable serialization +var t = new Uint8Array(b, 0, 64); +for (var i = 0; i < 64; i++) { t[i] = i; } +worker.postMessage({ array: t, test: 'regular'}, [t.buffer]); +if (t[0] == 0) { + console.log("ERROR: array was still accessible after transfer"); +} +worker.postMessage({ terminate: 1 }); +gc(); From 0441e2e7c1a8df8a7bd0fdef4d42d19e6acdfe8b Mon Sep 17 00:00:00 2001 From: Mark Logan Date: Fri, 17 Jul 2015 16:26:50 -0700 Subject: [PATCH 4/4] Recompile worker.ls --- src/worker.js | 10 ++++++---- src/worker.js.c | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/worker.js b/src/worker.js index ab3f593..d087a25 100644 --- a/src/worker.js +++ b/src/worker.js @@ -1,3 +1,4 @@ +// Generated by LiveScript 1.3.1 function Worker(){ var Threads; Threads = this; @@ -7,12 +8,12 @@ function Worker(){ var t, this$ = this; this.thread = t = Threads.create(); t.on('message', function(args){ - return typeof this$.onmessage === 'function' ? this$.onmessage({ + return typeof this$.onmessage == 'function' ? this$.onmessage({ data: args }) : void 8; }); t.on('error', function(args){ - return typeof this$.onerror === 'function' ? this$.onerror(args) : void 8; + return typeof this$.onerror == 'function' ? this$.onerror(args) : void 8; }); t.on('close', function(){ return t.destroy(); @@ -31,8 +32,9 @@ function Worker(){ return t.emitSerialized(event.type, null, event); }; this.postMessage = function(data, transferables){ - console.log('hay'); - if (transferables == null) { transferables = null; } + if (transferables == null) { + transferables = null; + } return t.emitSerialized('message', transferables, { data: data }); diff --git a/src/worker.js.c b/src/worker.js.c index 1d97532..5574d00 100644 --- a/src/worker.js.c +++ b/src/worker.js.c @@ -1 +1 @@ -static const char* kWorker_js= "(\n\x66\x75\x6e\x63\x74\x69\x6f\x6e \x57\x6f\x72\x6b\x65\x72\x28\x29\x7b\x76\x61\x72 \x54\x68\x72\x65\x61\x64\x73\x3b\x54\x68\x72\x65\x61\x64\x73\x3d\x74\x68\x69\x73\x3b\x72\x65\x74\x75\x72\x6e\x28\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x29\x7b\x76\x61\x72 \x70\x72\x6f\x74\x6f\x74\x79\x70\x65\x3d\x63\x6f\x6e\x73\x74\x72\x75\x63\x74\x6f\x72\x2e\x70\x72\x6f\x74\x6f\x74\x79\x70\x65\x3b\x66\x75\x6e\x63\x74\x69\x6f\x6e \x63\x6f\x6e\x73\x74\x72\x75\x63\x74\x6f\x72\x28\x63\x6f\x64\x65\x29\x7b\x76\x61\x72 \x74\x2c\x74\x68\x69\x73\x24\x3d\x74\x68\x69\x73\x3b\x74\x68\x69\x73\x2e\x74\x68\x72\x65\x61\x64\x3d\x74\x3d\x54\x68\x72\x65\x61\x64\x73\x2e\x63\x72\x65\x61\x74\x65\x28\x29\x3b\x74\x2e\x6f\x6e\x28\x27\x6d\x65\x73\x73\x61\x67\x65\x27\x2c\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x61\x72\x67\x73\x29\x7b\x72\x65\x74\x75\x72\x6e \x74\x79\x70\x65\x6f\x66 \x74\x68\x69\x73\x24\x2e\x6f\x6e\x6d\x65\x73\x73\x61\x67\x65\x3d\x3d\x3d\x27\x66\x75\x6e\x63\x74\x69\x6f\x6e\x27\x3f\x74\x68\x69\x73\x24\x2e\x6f\x6e\x6d\x65\x73\x73\x61\x67\x65\x28\x7b\x64\x61\x74\x61\x3a\x61\x72\x67\x73\x7d\x29\x3a\x76\x6f\x69\x64 \x38\x3b\x7d\x29\x3b\x74\x2e\x6f\x6e\x28\x27\x65\x72\x72\x6f\x72\x27\x2c\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x61\x72\x67\x73\x29\x7b\x72\x65\x74\x75\x72\x6e \x74\x79\x70\x65\x6f\x66 \x74\x68\x69\x73\x24\x2e\x6f\x6e\x65\x72\x72\x6f\x72\x3d\x3d\x3d\x27\x66\x75\x6e\x63\x74\x69\x6f\x6e\x27\x3f\x74\x68\x69\x73\x24\x2e\x6f\x6e\x65\x72\x72\x6f\x72\x28\x61\x72\x67\x73\x29\x3a\x76\x6f\x69\x64 \x38\x3b\x7d\x29\x3b\x74\x2e\x6f\x6e\x28\x27\x63\x6c\x6f\x73\x65\x27\x2c\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x29\x7b\x72\x65\x74\x75\x72\x6e \x74\x2e\x64\x65\x73\x74\x72\x6f\x79\x28\x29\x3b\x7d\x29\x3b\x74\x68\x69\x73\x2e\x74\x65\x72\x6d\x69\x6e\x61\x74\x65\x3d\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x29\x7b\x72\x65\x74\x75\x72\x6e \x74\x2e\x64\x65\x73\x74\x72\x6f\x79\x28\x29\x3b\x7d\x3b\x74\x68\x69\x73\x2e\x61\x64\x64\x45\x76\x65\x6e\x74\x4c\x69\x73\x74\x65\x6e\x65\x72\x3d\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x65\x76\x65\x6e\x74\x2c\x63\x62\x29\x7b\x69\x66\x28\x65\x76\x65\x6e\x74\x3d\x3d\x3d\x27\x6d\x65\x73\x73\x61\x67\x65\x27\x29\x7b\x72\x65\x74\x75\x72\x6e \x74\x68\x69\x73\x24\x2e\x6f\x6e\x6d\x65\x73\x73\x61\x67\x65\x3d\x63\x62\x3b\x7d\x65\x6c\x73\x65\x7b\x72\x65\x74\x75\x72\x6e \x74\x2e\x6f\x6e\x28\x65\x76\x65\x6e\x74\x2c\x63\x62\x29\x3b\x7d\x7d\x3b\x74\x68\x69\x73\x2e\x64\x69\x73\x70\x61\x74\x63\x68\x45\x76\x65\x6e\x74\x3d\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x65\x76\x65\x6e\x74\x29\x7b\x72\x65\x74\x75\x72\x6e \x74\x2e\x65\x6d\x69\x74\x53\x65\x72\x69\x61\x6c\x69\x7a\x65\x64\x28\x65\x76\x65\x6e\x74\x2e\x74\x79\x70\x65\x2c\x65\x76\x65\x6e\x74\x29\x3b\x7d\x3b\x74\x68\x69\x73\x2e\x70\x6f\x73\x74\x4d\x65\x73\x73\x61\x67\x65\x3d\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x64\x61\x74\x61\x29\x7b\x72\x65\x74\x75\x72\x6e \x74\x2e\x65\x6d\x69\x74\x53\x65\x72\x69\x61\x6c\x69\x7a\x65\x64\x28\x27\x6d\x65\x73\x73\x61\x67\x65\x27\x2c\x7b\x64\x61\x74\x61\x3a\x64\x61\x74\x61\x7d\x29\x3b\x7d\x3b\x69\x66\x28\x74\x79\x70\x65\x6f\x66 \x63\x6f\x64\x65\x3d\x3d\x3d\x27\x66\x75\x6e\x63\x74\x69\x6f\x6e\x27\x29\x7b\x74\x2e\x65\x76\x61\x6c\x28\x22\x28\x22\x2b\x63\x6f\x64\x65\x2b\x22\x29\x28\x29\x22\x29\x3b\x7d\x65\x6c\x73\x65 \x69\x66\x28\x63\x6f\x64\x65\x21\x3d\x6e\x75\x6c\x6c\x29\x7b\x74\x2e\x6c\x6f\x61\x64\x28\x63\x6f\x64\x65\x29\x3b\x7d\x7d\n\x72\x65\x74\x75\x72\x6e \x63\x6f\x6e\x73\x74\x72\x75\x63\x74\x6f\x72\x3b\x7d\x28\x29\x29\x3b\x7d)"; +static const char* kWorker_js= "(\n\x66\x75\x6e\x63\x74\x69\x6f\x6e \x57\x6f\x72\x6b\x65\x72\x28\x29\x7b\x76\x61\x72 \x54\x68\x72\x65\x61\x64\x73\x3b\x54\x68\x72\x65\x61\x64\x73\x3d\x74\x68\x69\x73\x3b\x72\x65\x74\x75\x72\x6e\x28\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x29\x7b\x76\x61\x72 \x70\x72\x6f\x74\x6f\x74\x79\x70\x65\x3d\x63\x6f\x6e\x73\x74\x72\x75\x63\x74\x6f\x72\x2e\x70\x72\x6f\x74\x6f\x74\x79\x70\x65\x3b\x66\x75\x6e\x63\x74\x69\x6f\x6e \x63\x6f\x6e\x73\x74\x72\x75\x63\x74\x6f\x72\x28\x63\x6f\x64\x65\x29\x7b\x76\x61\x72 \x74\x2c\x74\x68\x69\x73\x24\x3d\x74\x68\x69\x73\x3b\x74\x68\x69\x73\x2e\x74\x68\x72\x65\x61\x64\x3d\x74\x3d\x54\x68\x72\x65\x61\x64\x73\x2e\x63\x72\x65\x61\x74\x65\x28\x29\x3b\x74\x2e\x6f\x6e\x28\x27\x6d\x65\x73\x73\x61\x67\x65\x27\x2c\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x61\x72\x67\x73\x29\x7b\x72\x65\x74\x75\x72\x6e \x74\x79\x70\x65\x6f\x66 \x74\x68\x69\x73\x24\x2e\x6f\x6e\x6d\x65\x73\x73\x61\x67\x65\x3d\x3d\x27\x66\x75\x6e\x63\x74\x69\x6f\x6e\x27\x3f\x74\x68\x69\x73\x24\x2e\x6f\x6e\x6d\x65\x73\x73\x61\x67\x65\x28\x7b\x64\x61\x74\x61\x3a\x61\x72\x67\x73\x7d\x29\x3a\x76\x6f\x69\x64 \x38\x3b\x7d\x29\x3b\x74\x2e\x6f\x6e\x28\x27\x65\x72\x72\x6f\x72\x27\x2c\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x61\x72\x67\x73\x29\x7b\x72\x65\x74\x75\x72\x6e \x74\x79\x70\x65\x6f\x66 \x74\x68\x69\x73\x24\x2e\x6f\x6e\x65\x72\x72\x6f\x72\x3d\x3d\x27\x66\x75\x6e\x63\x74\x69\x6f\x6e\x27\x3f\x74\x68\x69\x73\x24\x2e\x6f\x6e\x65\x72\x72\x6f\x72\x28\x61\x72\x67\x73\x29\x3a\x76\x6f\x69\x64 \x38\x3b\x7d\x29\x3b\x74\x2e\x6f\x6e\x28\x27\x63\x6c\x6f\x73\x65\x27\x2c\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x29\x7b\x72\x65\x74\x75\x72\x6e \x74\x2e\x64\x65\x73\x74\x72\x6f\x79\x28\x29\x3b\x7d\x29\x3b\x74\x68\x69\x73\x2e\x74\x65\x72\x6d\x69\x6e\x61\x74\x65\x3d\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x29\x7b\x72\x65\x74\x75\x72\x6e \x74\x2e\x64\x65\x73\x74\x72\x6f\x79\x28\x29\x3b\x7d\x3b\x74\x68\x69\x73\x2e\x61\x64\x64\x45\x76\x65\x6e\x74\x4c\x69\x73\x74\x65\x6e\x65\x72\x3d\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x65\x76\x65\x6e\x74\x2c\x63\x62\x29\x7b\x69\x66\x28\x65\x76\x65\x6e\x74\x3d\x3d\x3d\x27\x6d\x65\x73\x73\x61\x67\x65\x27\x29\x7b\x72\x65\x74\x75\x72\x6e \x74\x68\x69\x73\x24\x2e\x6f\x6e\x6d\x65\x73\x73\x61\x67\x65\x3d\x63\x62\x3b\x7d\x65\x6c\x73\x65\x7b\x72\x65\x74\x75\x72\x6e \x74\x2e\x6f\x6e\x28\x65\x76\x65\x6e\x74\x2c\x63\x62\x29\x3b\x7d\x7d\x3b\x74\x68\x69\x73\x2e\x64\x69\x73\x70\x61\x74\x63\x68\x45\x76\x65\x6e\x74\x3d\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x65\x76\x65\x6e\x74\x29\x7b\x72\x65\x74\x75\x72\x6e \x74\x2e\x65\x6d\x69\x74\x53\x65\x72\x69\x61\x6c\x69\x7a\x65\x64\x28\x65\x76\x65\x6e\x74\x2e\x74\x79\x70\x65\x2c\x6e\x75\x6c\x6c\x2c\x65\x76\x65\x6e\x74\x29\x3b\x7d\x3b\x74\x68\x69\x73\x2e\x70\x6f\x73\x74\x4d\x65\x73\x73\x61\x67\x65\x3d\x66\x75\x6e\x63\x74\x69\x6f\x6e\x28\x64\x61\x74\x61\x2c\x74\x72\x61\x6e\x73\x66\x65\x72\x61\x62\x6c\x65\x73\x29\x7b\x69\x66\x28\x74\x72\x61\x6e\x73\x66\x65\x72\x61\x62\x6c\x65\x73\x3d\x3d\x6e\x75\x6c\x6c\x29\x7b\x74\x72\x61\x6e\x73\x66\x65\x72\x61\x62\x6c\x65\x73\x3d\x6e\x75\x6c\x6c\x3b\x7d\n\x72\x65\x74\x75\x72\x6e \x74\x2e\x65\x6d\x69\x74\x53\x65\x72\x69\x61\x6c\x69\x7a\x65\x64\x28\x27\x6d\x65\x73\x73\x61\x67\x65\x27\x2c\x74\x72\x61\x6e\x73\x66\x65\x72\x61\x62\x6c\x65\x73\x2c\x7b\x64\x61\x74\x61\x3a\x64\x61\x74\x61\x7d\x29\x3b\x7d\x3b\x69\x66\x28\x74\x79\x70\x65\x6f\x66 \x63\x6f\x64\x65\x3d\x3d\x3d\x27\x66\x75\x6e\x63\x74\x69\x6f\x6e\x27\x29\x7b\x74\x2e\x65\x76\x61\x6c\x28\x22\x28\x22\x2b\x63\x6f\x64\x65\x2b\x22\x29\x28\x29\x22\x29\x3b\x7d\x65\x6c\x73\x65 \x69\x66\x28\x63\x6f\x64\x65\x21\x3d\x6e\x75\x6c\x6c\x29\x7b\x74\x2e\x6c\x6f\x61\x64\x28\x63\x6f\x64\x65\x29\x3b\x7d\x7d\n\x72\x65\x74\x75\x72\x6e \x63\x6f\x6e\x73\x74\x72\x75\x63\x74\x6f\x72\x3b\x7d\x28\x29\x29\x3b\x7d)";