-
Notifications
You must be signed in to change notification settings - Fork 42
/
Copy pathv2.go
206 lines (167 loc) · 4.71 KB
/
v2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package pubsub
import (
"encoding/json"
"github.com/matrix-org/sliding-sync/internal"
)
// The channel which has V2* payloads
const ChanV2 = "v2ch"
// V2Listener describes the messages that sync v2 pollers will publish.
type V2Listener interface {
Initialise(p *V2Initialise)
Accumulate(p *V2Accumulate)
OnTransactionID(p *V2TransactionID)
OnAccountData(p *V2AccountData)
OnInvite(p *V2InviteRoom)
OnLeftRoom(p *V2LeaveRoom)
OnUnreadCounts(p *V2UnreadCounts)
OnInitialSyncComplete(p *V2InitialSyncComplete)
OnDeviceData(p *V2DeviceData)
OnTyping(p *V2Typing)
OnReceipt(p *V2Receipt)
OnDeviceMessages(p *V2DeviceMessages)
OnExpiredToken(p *V2ExpiredToken)
OnInvalidateRoom(p *V2InvalidateRoom)
OnStateRedaction(p *V2StateRedaction)
}
type V2Initialise struct {
RoomID string
SnapshotNID int64
}
func (*V2Initialise) Type() string { return "V2Initialise" }
type V2Accumulate struct {
RoomID string
PrevBatch string
EventNIDs []int64
}
func (*V2Accumulate) Type() string { return "V2Accumulate" }
// V2TransactionID is emitted by a poller when it sees an event with a transaction ID,
// or when it is certain that no other poller will see a transaction ID for this event
// (the "all-clear").
type V2TransactionID struct {
EventID string
RoomID string
UserID string // of the sender
DeviceID string
TransactionID string // Note: an empty transaction ID represents the all-clear.
NID int64
}
func (*V2TransactionID) Type() string { return "V2TransactionID" }
type V2UnreadCounts struct {
UserID string
RoomID string
HighlightCount *int
NotificationCount *int
}
func (*V2UnreadCounts) Type() string { return "V2UnreadCounts" }
type V2AccountData struct {
UserID string
RoomID string
Types []string
}
func (*V2AccountData) Type() string { return "V2AccountData" }
type V2LeaveRoom struct {
UserID string
RoomID string
LeaveEvent json.RawMessage
}
func (*V2LeaveRoom) Type() string { return "V2LeaveRoom" }
type V2InviteRoom struct {
UserID string
RoomID string
}
func (*V2InviteRoom) Type() string { return "V2InviteRoom" }
type V2InitialSyncComplete struct {
UserID string
DeviceID string
Success bool
}
func (*V2InitialSyncComplete) Type() string { return "V2InitialSyncComplete" }
type V2DeviceData struct {
UserIDToDeviceIDs map[string][]string
}
func (*V2DeviceData) Type() string { return "V2DeviceData" }
type V2Typing struct {
RoomID string
EphemeralEvent json.RawMessage
}
func (*V2Typing) Type() string { return "V2Typing" }
type V2Receipt struct {
RoomID string
Receipts []internal.Receipt
}
func (*V2Receipt) Type() string { return "V2Receipt" }
type V2DeviceMessages struct {
UserID string
DeviceID string
}
func (*V2DeviceMessages) Type() string { return "V2DeviceMessages" }
type V2ExpiredToken struct {
UserID string
DeviceID string
}
func (*V2ExpiredToken) Type() string { return "V2ExpiredToken" }
// V2StateRedaction is emitted when a timeline is seen that contains one or more
// redaction events targeting a piece of room state. The redaction will be emitted
// before its corresponding V2Accumulate payload is emitted.
type V2StateRedaction struct {
RoomID string
}
func (*V2StateRedaction) Type() string { return "V2StateRedaction" }
// V2InvalidateRoom is emitted after a non-incremental state change to a room, in place
// of a V2Initialise payload.
type V2InvalidateRoom struct {
RoomID string
}
func (*V2InvalidateRoom) Type() string { return "V2InvalidateRoom" }
type V2Sub struct {
listener Listener
receiver V2Listener
}
func NewV2Sub(l Listener, recv V2Listener) *V2Sub {
return &V2Sub{
listener: l,
receiver: recv,
}
}
func (v *V2Sub) Teardown() {
v.listener.Close()
}
func (v *V2Sub) onMessage(p Payload) {
switch pl := p.(type) {
case *V2Receipt:
v.receiver.OnReceipt(pl)
case *V2Initialise:
v.receiver.Initialise(pl)
case *V2Accumulate:
v.receiver.Accumulate(pl)
case *V2TransactionID:
v.receiver.OnTransactionID(pl)
case *V2AccountData:
v.receiver.OnAccountData(pl)
case *V2InviteRoom:
v.receiver.OnInvite(pl)
case *V2LeaveRoom:
v.receiver.OnLeftRoom(pl)
case *V2UnreadCounts:
v.receiver.OnUnreadCounts(pl)
case *V2InitialSyncComplete:
v.receiver.OnInitialSyncComplete(pl)
case *V2DeviceData:
v.receiver.OnDeviceData(pl)
case *V2Typing:
v.receiver.OnTyping(pl)
case *V2DeviceMessages:
v.receiver.OnDeviceMessages(pl)
case *V2ExpiredToken:
v.receiver.OnExpiredToken(pl)
case *V2InvalidateRoom:
v.receiver.OnInvalidateRoom(pl)
case *V2StateRedaction:
v.receiver.OnStateRedaction(pl)
default:
logger.Warn().Str("type", p.Type()).Msg("V2Sub: unhandled payload type")
}
}
func (v *V2Sub) Listen() error {
return v.listener.Listen(ChanV2, v.onMessage)
}