Skip to content
This repository was archived by the owner on Feb 1, 2020. It is now read-only.

Commit c17ea8c

Browse files
sepehrtheraissshentubot
authored andcommitted
Block for link address resolution
Previously, if address resolution for UDP or Ping sockets required sending packets using Write in Transport layer, Resolve would return ErrWouldBlock and Write would return ErrNoLinkAddress. Meanwhile startAddressResolution would run in background. Further calls to Write using same address would also return ErrNoLinkAddress until resolution has been completed successfully. Since Write is not allowed to block and System Calls need to be interruptible in System Call layer, the caller to Write is responsible for blocking upon return of ErrWouldBlock. Now, when startAddressResolution is called a notification channel for the completion of the address resolution is returned. The channel will traverse up to the calling function of Write as well as ErrNoLinkAddress. Once address resolution is complete (success or not) the channel is closed. The caller would call Write again to send packets and check if address resolution was compeleted successfully or not. Fixes google#5 Change-Id: Idafaf31982bee1915ca084da39ae7bd468cebd93 PiperOrigin-RevId: 214962200
1 parent cf226d4 commit c17ea8c

File tree

20 files changed

+220
-129
lines changed

20 files changed

+220
-129
lines changed

pkg/dhcp/client.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -195,10 +195,23 @@ func (c *Client) Request(ctx context.Context, requestedAddr tcpip.Address) (cfg
195195
wopts := tcpip.WriteOptions{
196196
To: serverAddr,
197197
}
198-
if _, err := ep.Write(tcpip.SlicePayload(h), wopts); err != nil {
198+
var resCh <-chan struct{}
199+
if _, resCh, err = ep.Write(tcpip.SlicePayload(h), wopts); err != nil && resCh == nil {
199200
return Config{}, fmt.Errorf("dhcp discovery write: %v", err)
200201
}
201202

203+
if resCh != nil {
204+
select {
205+
case <-resCh:
206+
case <-ctx.Done():
207+
return Config{}, fmt.Errorf("dhcp client address resolution: %v", tcpip.ErrAborted)
208+
}
209+
210+
if _, _, err := ep.Write(tcpip.SlicePayload(h), wopts); err != nil {
211+
return Config{}, fmt.Errorf("dhcp discovery write: %v", err)
212+
}
213+
}
214+
202215
we, ch := waiter.NewChannelEntry(nil)
203216
wq.EventRegister(&we, waiter.EventIn)
204217
defer wq.EventUnregister(&we)
@@ -289,7 +302,7 @@ func (c *Client) Request(ctx context.Context, requestedAddr tcpip.Address) (cfg
289302
reqOpts = append(reqOpts, option{optClientID, clientID})
290303
}
291304
h.setOptions(reqOpts)
292-
if _, err := ep.Write(tcpip.SlicePayload(h), wopts); err != nil {
305+
if _, _, err := ep.Write(tcpip.SlicePayload(h), wopts); err != nil {
293306
return Config{}, fmt.Errorf("dhcp discovery write: %v", err)
294307
}
295308

pkg/dhcp/server.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,22 @@ func (c *epConn) Read() (buffer.View, tcpip.FullAddress, error) {
9595
}
9696

9797
func (c *epConn) Write(b []byte, addr *tcpip.FullAddress) error {
98-
if _, err := c.ep.Write(tcpip.SlicePayload(b), tcpip.WriteOptions{To: addr}); err != nil {
98+
_, resCh, err := c.ep.Write(tcpip.SlicePayload(b), tcpip.WriteOptions{To: addr})
99+
if err != nil && resCh == nil {
99100
return fmt.Errorf("write: %v", err)
100101
}
102+
103+
if resCh != nil {
104+
select {
105+
case <-resCh:
106+
case <-c.ctx.Done():
107+
return fmt.Errorf("dhcp server address resolution: %v", tcpip.ErrAborted)
108+
}
109+
110+
if _, _, err := c.ep.Write(tcpip.SlicePayload(b), tcpip.WriteOptions{To: addr}); err != nil {
111+
return fmt.Errorf("write: %v", err)
112+
}
113+
}
101114
return nil
102115
}
103116

pkg/sentry/socket/epsocket/epsocket.go

+20-3
Original file line numberDiff line numberDiff line change
@@ -276,10 +276,21 @@ func (i *ioSequencePayload) Size() int {
276276
// Write implements fs.FileOperations.Write.
277277
func (s *SocketOperations) Write(ctx context.Context, _ *fs.File, src usermem.IOSequence, _ int64) (int64, error) {
278278
f := &ioSequencePayload{ctx: ctx, src: src}
279-
n, err := s.Endpoint.Write(f, tcpip.WriteOptions{})
279+
n, resCh, err := s.Endpoint.Write(f, tcpip.WriteOptions{})
280280
if err == tcpip.ErrWouldBlock {
281281
return int64(n), syserror.ErrWouldBlock
282282
}
283+
284+
if resCh != nil {
285+
t := ctx.(*kernel.Task)
286+
if err := t.Block(resCh); err != nil {
287+
return int64(n), syserr.FromError(err).ToError()
288+
}
289+
290+
n, _, err = s.Endpoint.Write(f, tcpip.WriteOptions{})
291+
return int64(n), syserr.TranslateNetstackError(err).ToError()
292+
}
293+
283294
return int64(n), syserr.TranslateNetstackError(err).ToError()
284295
}
285296

@@ -1016,7 +1027,13 @@ func (s *SocketOperations) SendMsg(t *kernel.Task, src usermem.IOSequence, to []
10161027
EndOfRecord: flags&linux.MSG_EOR != 0,
10171028
}
10181029

1019-
n, err := s.Endpoint.Write(tcpip.SlicePayload(v), opts)
1030+
n, resCh, err := s.Endpoint.Write(tcpip.SlicePayload(v), opts)
1031+
if resCh != nil {
1032+
if err := t.Block(resCh); err != nil {
1033+
return int(n), syserr.FromError(err)
1034+
}
1035+
n, _, err = s.Endpoint.Write(tcpip.SlicePayload(v), opts)
1036+
}
10201037
if err != tcpip.ErrWouldBlock || flags&linux.MSG_DONTWAIT != 0 {
10211038
return int(n), syserr.TranslateNetstackError(err)
10221039
}
@@ -1030,7 +1047,7 @@ func (s *SocketOperations) SendMsg(t *kernel.Task, src usermem.IOSequence, to []
10301047
v.TrimFront(int(n))
10311048
total := n
10321049
for {
1033-
n, err = s.Endpoint.Write(tcpip.SlicePayload(v), opts)
1050+
n, _, err = s.Endpoint.Write(tcpip.SlicePayload(v), opts)
10341051
v.TrimFront(int(n))
10351052
total += n
10361053
if err != tcpip.ErrWouldBlock {

pkg/tcpip/adapters/gonet/gonet.go

+29-6
Original file line numberDiff line numberDiff line change
@@ -393,9 +393,22 @@ func (c *Conn) Write(b []byte) (int, error) {
393393
}
394394

395395
var n uintptr
396-
n, err = c.ep.Write(tcpip.SlicePayload(v), tcpip.WriteOptions{})
396+
var resCh <-chan struct{}
397+
n, resCh, err = c.ep.Write(tcpip.SlicePayload(v), tcpip.WriteOptions{})
397398
nbytes += int(n)
398399
v.TrimFront(int(n))
400+
401+
if resCh != nil {
402+
select {
403+
case <-deadline:
404+
return nbytes, c.newOpError("write", &timeoutError{})
405+
case <-resCh:
406+
}
407+
408+
n, _, err = c.ep.Write(tcpip.SlicePayload(v), tcpip.WriteOptions{})
409+
nbytes += int(n)
410+
v.TrimFront(int(n))
411+
}
399412
}
400413

401414
if err == nil {
@@ -571,23 +584,33 @@ func (c *PacketConn) WriteTo(b []byte, addr net.Addr) (int, error) {
571584
copy(v, b)
572585

573586
wopts := tcpip.WriteOptions{To: &fullAddr}
574-
n, err := c.ep.Write(tcpip.SlicePayload(v), wopts)
587+
n, resCh, err := c.ep.Write(tcpip.SlicePayload(v), wopts)
588+
if resCh != nil {
589+
select {
590+
case <-deadline:
591+
return int(n), c.newRemoteOpError("write", addr, &timeoutError{})
592+
case <-resCh:
593+
}
594+
595+
n, _, err = c.ep.Write(tcpip.SlicePayload(v), wopts)
596+
}
575597

576598
if err == tcpip.ErrWouldBlock {
577599
// Create wait queue entry that notifies a channel.
578600
waitEntry, notifyCh := waiter.NewChannelEntry(nil)
579601
c.wq.EventRegister(&waitEntry, waiter.EventOut)
580602
defer c.wq.EventUnregister(&waitEntry)
581603
for {
582-
n, err = c.ep.Write(tcpip.SlicePayload(v), wopts)
583-
if err != tcpip.ErrWouldBlock {
584-
break
585-
}
586604
select {
587605
case <-deadline:
588606
return int(n), c.newRemoteOpError("write", addr, &timeoutError{})
589607
case <-notifyCh:
590608
}
609+
610+
n, _, err = c.ep.Write(tcpip.SlicePayload(v), wopts)
611+
if err != tcpip.ErrWouldBlock {
612+
break
613+
}
591614
}
592615
}
593616

pkg/tcpip/network/ipv6/icmp_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ func TestLinkResolution(t *testing.T) {
190190
if ctx.Err() != nil {
191191
break
192192
}
193-
if _, err := ep.Write(payload, tcpip.WriteOptions{To: &tcpip.FullAddress{NIC: 1, Addr: lladdr1}}); err == tcpip.ErrNoLinkAddress {
193+
if _, _, err := ep.Write(payload, tcpip.WriteOptions{To: &tcpip.FullAddress{NIC: 1, Addr: lladdr1}}); err == tcpip.ErrNoLinkAddress {
194194
// There's something asynchronous going on; yield to let it do its thing.
195195
runtime.Gosched()
196196
} else if err == nil {

pkg/tcpip/sample/tun_tcp_connect/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func writer(ch chan struct{}, ep tcpip.Endpoint) {
8080

8181
v.CapLength(n)
8282
for len(v) > 0 {
83-
n, err := ep.Write(tcpip.SlicePayload(v), tcpip.WriteOptions{})
83+
n, _, err := ep.Write(tcpip.SlicePayload(v), tcpip.WriteOptions{})
8484
if err != nil {
8585
fmt.Println("Write failed:", err)
8686
return

pkg/tcpip/stack/linkaddrcache.go

+29-11
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,14 @@ type linkAddrEntry struct {
8888
linkAddr tcpip.LinkAddress
8989
expiration time.Time
9090
s entryState
91+
resDone bool
9192

9293
// wakers is a set of waiters for address resolution result. Anytime
9394
// state transitions out of 'incomplete' these waiters are notified.
9495
wakers map[*sleep.Waker]struct{}
9596

9697
cancel chan struct{}
98+
resCh chan struct{}
9799
}
98100

99101
func (e *linkAddrEntry) state() entryState {
@@ -182,15 +184,20 @@ func (c *linkAddrCache) makeAndAddEntry(k tcpip.FullAddress, v tcpip.LinkAddress
182184
// someone waiting for address resolution on it.
183185
entry.changeState(expired)
184186
if entry.cancel != nil {
185-
entry.cancel <- struct{}{}
187+
if !entry.resDone {
188+
close(entry.resCh)
189+
}
190+
close(entry.cancel)
186191
}
187192

188193
*entry = linkAddrEntry{
189194
addr: k,
190195
linkAddr: v,
191196
expiration: time.Now().Add(c.ageLimit),
197+
resDone: false,
192198
wakers: make(map[*sleep.Waker]struct{}),
193199
cancel: make(chan struct{}, 1),
200+
resCh: make(chan struct{}, 1),
194201
}
195202

196203
c.cache[k] = entry
@@ -202,10 +209,10 @@ func (c *linkAddrCache) makeAndAddEntry(k tcpip.FullAddress, v tcpip.LinkAddress
202209
}
203210

204211
// get reports any known link address for k.
205-
func (c *linkAddrCache) get(k tcpip.FullAddress, linkRes LinkAddressResolver, localAddr tcpip.Address, linkEP LinkEndpoint, waker *sleep.Waker) (tcpip.LinkAddress, *tcpip.Error) {
212+
func (c *linkAddrCache) get(k tcpip.FullAddress, linkRes LinkAddressResolver, localAddr tcpip.Address, linkEP LinkEndpoint, waker *sleep.Waker) (tcpip.LinkAddress, <-chan struct{}, *tcpip.Error) {
206213
if linkRes != nil {
207214
if addr, ok := linkRes.ResolveStaticAddress(k.Addr); ok {
208-
return addr, nil
215+
return addr, nil, nil
209216
}
210217
}
211218

@@ -214,10 +221,11 @@ func (c *linkAddrCache) get(k tcpip.FullAddress, linkRes LinkAddressResolver, lo
214221
if entry == nil || entry.state() == expired {
215222
c.mu.Unlock()
216223
if linkRes == nil {
217-
return "", tcpip.ErrNoLinkAddress
224+
return "", nil, tcpip.ErrNoLinkAddress
218225
}
219-
c.startAddressResolution(k, linkRes, localAddr, linkEP, waker)
220-
return "", tcpip.ErrWouldBlock
226+
227+
ch := c.startAddressResolution(k, linkRes, localAddr, linkEP, waker)
228+
return "", ch, tcpip.ErrWouldBlock
221229
}
222230
defer c.mu.Unlock()
223231

@@ -227,13 +235,13 @@ func (c *linkAddrCache) get(k tcpip.FullAddress, linkRes LinkAddressResolver, lo
227235
// in that case it's safe to consider it ready.
228236
fallthrough
229237
case ready:
230-
return entry.linkAddr, nil
238+
return entry.linkAddr, nil, nil
231239
case failed:
232-
return "", tcpip.ErrNoLinkAddress
240+
return "", nil, tcpip.ErrNoLinkAddress
233241
case incomplete:
234242
// Address resolution is still in progress.
235243
entry.addWaker(waker)
236-
return "", tcpip.ErrWouldBlock
244+
return "", entry.resCh, tcpip.ErrWouldBlock
237245
default:
238246
panic(fmt.Sprintf("invalid cache entry state: %d", s))
239247
}
@@ -249,13 +257,13 @@ func (c *linkAddrCache) removeWaker(k tcpip.FullAddress, waker *sleep.Waker) {
249257
}
250258
}
251259

252-
func (c *linkAddrCache) startAddressResolution(k tcpip.FullAddress, linkRes LinkAddressResolver, localAddr tcpip.Address, linkEP LinkEndpoint, waker *sleep.Waker) {
260+
func (c *linkAddrCache) startAddressResolution(k tcpip.FullAddress, linkRes LinkAddressResolver, localAddr tcpip.Address, linkEP LinkEndpoint, waker *sleep.Waker) <-chan struct{} {
253261
c.mu.Lock()
254262
defer c.mu.Unlock()
255263

256264
// Look up again with lock held to ensure entry wasn't added by someone else.
257265
if e := c.cache[k]; e != nil && e.state() != expired {
258-
return
266+
return nil
259267
}
260268

261269
// Add 'incomplete' entry in the cache to mark that resolution is in progress.
@@ -274,13 +282,23 @@ func (c *linkAddrCache) startAddressResolution(k tcpip.FullAddress, linkRes Link
274282
select {
275283
case <-time.After(c.resolutionTimeout):
276284
if stop := c.checkLinkRequest(k, i); stop {
285+
// If entry is evicted then resCh is already closed.
286+
c.mu.Lock()
287+
if e, ok := c.cache[k]; ok {
288+
if !e.resDone {
289+
e.resDone = true
290+
close(e.resCh)
291+
}
292+
}
293+
c.mu.Unlock()
277294
return
278295
}
279296
case <-cancel:
280297
return
281298
}
282299
}
283300
}()
301+
return e.resCh
284302
}
285303

286304
// checkLinkRequest checks whether previous attempt to resolve address has succeeded

0 commit comments

Comments
 (0)