Skip to content

Sockets #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions src/FetchList.vue
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import { computed, inject, onUnmounted, ref, useSlots, watch } from 'vue';
import useFetchState from './useFetchState';

import type Sockets from './sockets';

type SortSettings = {
field: string;
dir?: 'ASC' | 'DESC';
Expand All @@ -39,11 +41,59 @@ const props = defineProps({
const emit = defineEmits(['ready', 'loaded', 'update:modelValue']);

const stores = inject('stores') as any;
const socket = inject('socket') as Sockets<any>;
const store = stores[props.model]();
const cache = (inject('cache') as any)();
const settings = (inject('models') as any)[props.model];
const slots = useSlots();

const socketId = ref('#' + Math.random());

/* const sub = socket.subscribeToModel(props.model, props.filter);
sub.on('db:create', (data) => {
store.add([data]);
pushId(data.id);
});
sub.on('db:update', (data) => {
store.add([data]);
});
sub.on('db:delete', (data) => {
store.localDelete(data.id);
}); */

/* socketId.value = socket.subscribeToModel(props.model, props.filter, (event) => {
if (event.type === 'db:create') {
store.add([event.data]);
pushId(event.data.id);
} else if (event.type === 'db:update') {
store.add([event.data]);
} else if (event.type === 'db:delete') {
store.localDelete(event.data.id);
}
console.log('Subscription gave event', event);
}); */
/* if (socket.readyState === 1) {
socket.send(
JSON.stringify({
id: socketId.value,
subscribe: props.model,
filter: props.filter,
events: ['db:create', 'db:update', 'db:delete'],
})
);
} else {
socket.addEventListener('open', () => {
socket.send(
JSON.stringify({
id: socketId.value,
subscribe: props.model,
filter: props.filter,
events: ['db:create', 'db:update', 'db:delete'],
})
);
});
} */

const relations = computed(() => ({
...settings.hasMany,
...settings.belongsTo,
Expand Down Expand Up @@ -144,6 +194,7 @@ fetch();
watch(filterString, () => {
fetch();
});

watch(ids, (newIds, oldIds) => {
cache.subscribe(props.model, newIds);
cache.unsubscribe(props.model, oldIds);
Expand All @@ -158,11 +209,21 @@ watch(includeIds, (newIds, oldIds) => {
});
});

function handleSocketEvent(event: any) {
if (event.type === 'db:create' && event.subIds.includes(socketId.value)) {
pushId(event.id);
}
}

// socket.addEventListener('message', handleSocketEvent);

onUnmounted(() => {
cache.unsubscribe(props.model, ids.value);
Object.keys(includeIds.value).forEach((model) => {
cache.unsubscribe(model, includeIds.value[model]);
});

// sub.unsubscribe();
});

emit('ready', {
Expand Down
2 changes: 1 addition & 1 deletion src/components/createUseSingle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
ref,
watch,
type ComputedRef,
nextTick,
} from 'vue';
import useFetchState from '../useFetchState';
import unwrap from './unwrap';
Expand Down Expand Up @@ -117,7 +118,6 @@ export default function createUseSingle<Models, IdType>(
// Update cache subscriptions when ids change

watch(includedIds, (newIds, oldIds) => {
console.log('Newold', newIds, oldIds);
Object.keys(newIds).forEach((model) => {
cacheStore.subscribe(model, newIds[model]);
});
Expand Down
12 changes: 11 additions & 1 deletion src/createStores.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ function createStore(
if (sort.length) params.sort = createSortString(sort);
if (include.length) params.include = include.join(',');
if ('cursor' in params && params.cursor === undefined) {
console.log('Deleting emoty nextcursor');
delete params.cursor;
}
const url = overridePath || endpoint;
Expand Down Expand Up @@ -241,6 +240,10 @@ function createStore(
this.items = this.items.filter((item: any) => item.id !== id);
});
},
localDelete(id: ID) {
console.log('local delete', id);
this.items = this.items.filter((item: any) => item.id !== id);
},
bulkDelete(ids: ID[]) {
return api
.delete(endpoint + '/bulk', ids.map((id) => ({ id })) as any)
Expand All @@ -253,6 +256,12 @@ function createStore(
garbageCollect(ids: ID[]) {
this.items = this.items.filter((item: any) => !ids.includes(item.id));
},
sync(itemOrItems: any[] | any) {
const asArray = Array.isArray(itemOrItems)
? itemOrItems
: [itemOrItems];
this.add(asArray);
},
},
});
}
Expand Down Expand Up @@ -341,6 +350,7 @@ export default function createStores<Type, ModelInfo>(
delete: (id: Type[K]['id']) => Promise<void>;
// @ts-expect-error
bulkDelete: (ids: Type[K]['id'][]) => Promise<void>;
sync: (itemOrItems: Partial<Type[K]> | Partial<Type[K]>[]) => void;
}
>;
};
Expand Down
17 changes: 17 additions & 0 deletions src/createVroom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import createApi from './api';
import createUseList from './components/createUseList';
import createUseSingle from './components/createUseSingle';
import createUseSingleton from './components/createUseSingleton';
import Sockets from './sockets';
import Mocket from './server/Mocket';

export default function createVroom<Options extends Settings & { models: any }>(
options: Options
Expand All @@ -33,6 +35,18 @@ export default function createVroom<Options extends Settings & { models: any }>(
const server = __DEV__
? createServer<typeof db, IdentityModel>(settings, models, db)
: null;
const mocket =
__DEV__ && settings.server?.enable
? new Mocket<typeof db, IdentityModel>(
db,
settings.identityModel ? settings.identityModel() : null
)
: null;

const socket = new Sockets<ModelTypes>(
settings.ws,
mocket as any as Mocket<any, any>
);
const api = createApi(server);

const stores = createStores<ModelTypes, Options['models']>(
Expand All @@ -51,6 +65,8 @@ export default function createVroom<Options extends Settings & { models: any }>(
server,
stores,
cache,
socket,
mocket,
types: {} as ModelTypes,
useList: createUseList<ModelTypes, IdType<Options>['id']>(
models,
Expand All @@ -67,6 +83,7 @@ export default function createVroom<Options extends Settings & { models: any }>(
app.provide('stores', stores);
app.provide('models', models);
app.provide('cache', cache);
app.provide('socket', socket);
app.provide('vroomTypes', {} as ModelTypes);
app.component('FetchList', FetchList);
app.component('FetchSingle', FetchSingle);
Expand Down
121 changes: 121 additions & 0 deletions src/server/Mocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
type EventName = 'open' | 'close' | 'message';

export const socketChannel = new BroadcastChannel('socket:events');

let readyState = 0;

export default class Mocket<DbType, IdentityModel> {
public listeners: { [key in EventName]?: Array<(event: Event) => void> };
public subscriptions: Array<any>;
public handlers: Array<any>;
public readyState: number;
private db: DbType;
private identityModel: string;
private identity: IdentityModel | null;

constructor(db: DbType, identityModel: string) {
this.listeners = {};
this.subscriptions = [];
this.handlers = [];
this.readyState = 0;
this.db = db;
this.identityModel = identityModel;
this.identity = null;

setTimeout(() => {
this.readyState = 1;
readyState = 1;
this.listeners.open?.forEach((h) => h({} as any));
}, 150);

socketChannel.onmessage = (ev: any) => {
let subscribed = false;

this.subscriptions.forEach((s) => {
// console.log('Message received', ev, s);
if (ev.data.event) {
if (ev.data.event === s.event) subscribed = true;
} else if (
ev.data.model === s.model &&
s.ids &&
s.ids.includes(ev.data.id) &&
(!s.events || s.events.includes(ev.data.type))
) {
subscribed = true;
}
});

if (!subscribed) return;
const data = { ...ev.data };
console.log('🔻', data, this.listeners.message?.length);
this.listeners['message']?.forEach((handler) => {
handler({
// @ts-ignore
data: JSON.stringify(data),
});
});
};
}

public addEventListener(name: EventName, handler: (event: Event) => void) {
if (!this.listeners[name]) {
this.listeners[name] = [];
}
this.listeners[name]?.push(handler);
}

public removeEventListener(name: EventName, handler: (event: Event) => void) {
this.listeners[name] = this.listeners[name]?.filter((h) => h !== handler);
}

public addHandler(
model: string,
type: string,
handler: (data: any, db: DbType, identity: IdentityModel | null) => any
) {
this.handlers.push({ model, type, handler });
}

public broadcast(data: Object) {
this.send(JSON.stringify(data));
}

public send(data: string) {
const object = JSON.parse(data);
console.log('🟢:', object);
if (object.subscribe) {
this.subscriptions.push({
id: object.id,
model: object.subscribe,
ids: object.ids,
events: object.events || null,
});
console.log('Subscriptions are now', this.subscriptions);
} else if (object.unsubscribe) {
this.subscriptions = this.subscriptions.filter((s) => s.id !== object.id);
console.log('Subscriptions are now', this.subscriptions);
} else if (object.auth) {
// @ts-ignore
this.identity = this.db[this.identityModel].find(object.auth);
} else {
const handler = this.handlers.find(
(h) => h.model === object.model && h.type === object.type
);
if (handler) sendMessage(handler.handler(object, this.db, this.identity));
else {
sendMessage(object);
}
}
}
}

export function sendMessage(message: any) {
if (readyState !== 1) return;
socketChannel.postMessage(message);
}

/* const socketConnection = new Mocket();



export default socketConnection; */
Loading