Skip to content

Commit bbd6039

Browse files
committed
add SearchAsync
Signed-off-by: Adphi <[email protected]>
1 parent 0e43630 commit bbd6039

File tree

7 files changed

+401
-8
lines changed

7 files changed

+401
-8
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.idea

client.go

+1
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,6 @@ type Client interface {
3131
PasswordModify(*PasswordModifyRequest) (*PasswordModifyResult, error)
3232

3333
Search(*SearchRequest) (*SearchResult, error)
34+
SearchAsync(searchRequest *SearchRequest, done chan struct{}) (<-chan *SearchAsyncResponse, error)
3435
SearchWithPaging(searchRequest *SearchRequest, pagingSize uint32) (*SearchResult, error)
3536
}

ldap_test.go

+84
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,90 @@ func TestSearch(t *testing.T) {
9494
t.Logf("TestSearch: %s -> num of entries = %d", searchRequest.Filter, len(sr.Entries))
9595
}
9696

97+
func TestSearchAsync(t *testing.T) {
98+
l, err := DialURL(ldapServer)
99+
if err != nil {
100+
t.Fatal(err)
101+
}
102+
defer l.Close()
103+
104+
searchRequest := NewSearchRequest(
105+
baseDN,
106+
ScopeWholeSubtree, DerefAlways, 0, 0, false,
107+
filter[0],
108+
attributes,
109+
nil)
110+
111+
var entries []*Entry
112+
responses, err := l.SearchAsync(searchRequest, nil)
113+
if err != nil {
114+
t.Fatal(err)
115+
}
116+
for res := range responses {
117+
if err := res.Err(); err != nil {
118+
t.Error(err)
119+
break
120+
}
121+
if res.Closed() {
122+
break
123+
}
124+
switch res.Type {
125+
case SearchAsyncResponseTypeEntry:
126+
entries = append(entries, res.Entry)
127+
case SearchAsyncResponseTypeReferral:
128+
t.Logf("Received Referral: %s", res.Referral)
129+
case SearchAsyncResponseTypeControl:
130+
t.Logf("Received Control: %s", res.Control)
131+
}
132+
}
133+
t.Logf("TestSearch: %s -> num of entries = %d", searchRequest.Filter, len(entries))
134+
}
135+
136+
func TestSearchAsyncStop(t *testing.T) {
137+
l, err := DialURL(ldapServer)
138+
if err != nil {
139+
t.Fatal(err)
140+
}
141+
defer l.Close()
142+
143+
searchRequest := NewSearchRequest(
144+
baseDN,
145+
ScopeWholeSubtree, DerefAlways, 0, 0, false,
146+
filter[0],
147+
attributes,
148+
nil)
149+
150+
var entries []*Entry
151+
done := make(chan struct{})
152+
responses, err := l.SearchAsync(searchRequest, done)
153+
if err != nil {
154+
t.Fatal(err)
155+
}
156+
for res := range responses {
157+
if err := res.Err(); err != nil {
158+
t.Error(err)
159+
break
160+
}
161+
162+
if res.Closed() {
163+
break
164+
}
165+
close(done)
166+
switch res.Type {
167+
case SearchAsyncResponseTypeEntry:
168+
entries = append(entries, res.Entry)
169+
case SearchAsyncResponseTypeReferral:
170+
t.Logf("Received Referral: %s", res.Referral)
171+
case SearchAsyncResponseTypeControl:
172+
t.Logf("Received Control: %s", res.Control)
173+
}
174+
}
175+
if len(entries) > 1 {
176+
t.Errorf("Expected 1 entry, got %d", len(entries))
177+
}
178+
t.Logf("TestSearch: %s -> num of entries = %d", searchRequest.Filter, len(entries))
179+
}
180+
97181
func TestSearchStartTLS(t *testing.T) {
98182
l, err := DialURL(ldapServer)
99183
if err != nil {

search.go

+115-4
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,42 @@ func (s *SearchResult) PrettyPrint(indent int) {
338338
}
339339
}
340340

341+
// SearchAsyncResponseType describes the SearchAsyncResponse content type
342+
type SearchAsyncResponseType uint8
343+
344+
const (
345+
SearchAsyncResponseTypeNone SearchAsyncResponseType = iota
346+
SearchAsyncResponseTypeEntry
347+
SearchAsyncResponseTypeReferral
348+
SearchAsyncResponseTypeControl
349+
)
350+
351+
// SearchAsyncResponse holds the server's response message to an async search request
352+
type SearchAsyncResponse struct {
353+
// Type indicates the SearchAsyncResponse type
354+
Type SearchAsyncResponseType
355+
// Entry is the received entry, only set if Type is SearchAsyncResponseTypeEntry
356+
Entry *Entry
357+
// Referral is the received referral, only set if Type is SearchAsyncResponseTypeReferral
358+
Referral string
359+
// Control is the received control, only set if Type is SearchAsyncResponseTypeControl
360+
Control Control
361+
// closed indicates that the request is finished
362+
closed bool
363+
// err holds the encountered error while processing server's response, if any
364+
err error
365+
}
366+
367+
// Closed returns true if the request is finished
368+
func (r *SearchAsyncResponse) Closed() bool {
369+
return r.closed
370+
}
371+
372+
// Err returns the encountered error while processing server's response, if any
373+
func (r *SearchAsyncResponse) Err() error {
374+
return r.err
375+
}
376+
341377
// SearchRequest represents a search request to send to the server
342378
type SearchRequest struct {
343379
BaseDN string
@@ -405,10 +441,11 @@ func NewSearchRequest(
405441
// SearchWithPaging accepts a search request and desired page size in order to execute LDAP queries to fulfill the
406442
// search request. All paged LDAP query responses will be buffered and the final result will be returned atomically.
407443
// The following four cases are possible given the arguments:
408-
// - given SearchRequest missing a control of type ControlTypePaging: we will add one with the desired paging size
409-
// - given SearchRequest contains a control of type ControlTypePaging that isn't actually a ControlPaging: fail without issuing any queries
410-
// - given SearchRequest contains a control of type ControlTypePaging with pagingSize equal to the size requested: no change to the search request
411-
// - given SearchRequest contains a control of type ControlTypePaging with pagingSize not equal to the size requested: fail without issuing any queries
444+
// - given SearchRequest missing a control of type ControlTypePaging: we will add one with the desired paging size
445+
// - given SearchRequest contains a control of type ControlTypePaging that isn't actually a ControlPaging: fail without issuing any queries
446+
// - given SearchRequest contains a control of type ControlTypePaging with pagingSize equal to the size requested: no change to the search request
447+
// - given SearchRequest contains a control of type ControlTypePaging with pagingSize not equal to the size requested: fail without issuing any queries
448+
//
412449
// A requested pagingSize of 0 is interpreted as no limit by LDAP servers.
413450
func (l *Conn) SearchWithPaging(searchRequest *SearchRequest, pagingSize uint32) (*SearchResult, error) {
414451
var pagingControl *ControlPaging
@@ -519,6 +556,80 @@ func (l *Conn) Search(searchRequest *SearchRequest) (*SearchResult, error) {
519556
}
520557
}
521558

559+
// SearchAsync performs the given search request asynchronously, it takes an optional done channel to stop the request. It returns a SearchAsyncResponse channel which will be
560+
// closed when the request finished and an error, not nil if the request to the server failed
561+
func (l *Conn) SearchAsync(searchRequest *SearchRequest, done chan struct{}) (<-chan *SearchAsyncResponse, error) {
562+
if done == nil {
563+
done = make(chan struct{})
564+
}
565+
msgCtx, err := l.doRequest(searchRequest)
566+
if err != nil {
567+
return nil, err
568+
}
569+
responses := make(chan *SearchAsyncResponse)
570+
ch := make(chan *SearchAsyncResponse)
571+
rcv := func() {
572+
for {
573+
packet, err := l.readPacket(msgCtx)
574+
if err != nil {
575+
ch <- &SearchAsyncResponse{closed: true, err: err}
576+
return
577+
}
578+
579+
switch packet.Children[1].Tag {
580+
case 4:
581+
entry := &Entry{
582+
DN: packet.Children[1].Children[0].Value.(string),
583+
Attributes: unpackAttributes(packet.Children[1].Children[1].Children),
584+
}
585+
ch <- &SearchAsyncResponse{Type: SearchAsyncResponseTypeEntry, Entry: entry}
586+
case 5:
587+
err := GetLDAPError(packet)
588+
if err != nil {
589+
ch <- &SearchAsyncResponse{closed: true, err: err}
590+
return
591+
}
592+
var response SearchAsyncResponse
593+
if len(packet.Children) == 3 {
594+
for _, child := range packet.Children[2].Children {
595+
decodedChild, err := DecodeControl(child)
596+
if err != nil {
597+
responses <- &SearchAsyncResponse{closed: true, err: fmt.Errorf("failed to decode child control: %s", err)}
598+
return
599+
}
600+
response = SearchAsyncResponse{Type: SearchAsyncResponseTypeControl, Control: decodedChild}
601+
}
602+
}
603+
response.closed = true
604+
ch <- &response
605+
return
606+
case 19:
607+
ch <- &SearchAsyncResponse{Type: SearchAsyncResponseTypeReferral, Referral: packet.Children[1].Children[0].Value.(string)}
608+
}
609+
}
610+
}
611+
go func() {
612+
defer l.finishMessage(msgCtx)
613+
defer close(responses)
614+
go rcv()
615+
for {
616+
select {
617+
case <-done:
618+
responses <- &SearchAsyncResponse{
619+
closed: true,
620+
}
621+
return
622+
case res := <-ch:
623+
responses <- res
624+
if res.Closed() {
625+
return
626+
}
627+
}
628+
}
629+
}()
630+
return responses, nil
631+
}
632+
522633
// unpackAttributes will extract all given LDAP attributes and it's values
523634
// from the ber.Packet
524635
func unpackAttributes(children []*ber.Packet) []*EntryAttribute {

v3/client.go

+1
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,6 @@ type Client interface {
3131
PasswordModify(*PasswordModifyRequest) (*PasswordModifyResult, error)
3232

3333
Search(*SearchRequest) (*SearchResult, error)
34+
SearchAsync(searchRequest *SearchRequest, done chan struct{}) (<-chan *SearchAsyncResponse, error)
3435
SearchWithPaging(searchRequest *SearchRequest, pagingSize uint32) (*SearchResult, error)
3536
}

v3/ldap_test.go

+84
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,90 @@ func TestSearch(t *testing.T) {
9494
t.Logf("TestSearch: %s -> num of entries = %d", searchRequest.Filter, len(sr.Entries))
9595
}
9696

97+
func TestSearchAsync(t *testing.T) {
98+
l, err := DialURL(ldapServer)
99+
if err != nil {
100+
t.Fatal(err)
101+
}
102+
defer l.Close()
103+
104+
searchRequest := NewSearchRequest(
105+
baseDN,
106+
ScopeWholeSubtree, DerefAlways, 0, 0, false,
107+
filter[0],
108+
attributes,
109+
nil)
110+
111+
var entries []*Entry
112+
responses, err := l.SearchAsync(searchRequest, nil)
113+
if err != nil {
114+
t.Fatal(err)
115+
}
116+
for res := range responses {
117+
if err := res.Err(); err != nil {
118+
t.Error(err)
119+
break
120+
}
121+
if res.Closed() {
122+
break
123+
}
124+
switch res.Type {
125+
case SearchAsyncResponseTypeEntry:
126+
entries = append(entries, res.Entry)
127+
case SearchAsyncResponseTypeReferral:
128+
t.Logf("Received Referral: %s", res.Referral)
129+
case SearchAsyncResponseTypeControl:
130+
t.Logf("Received Control: %s", res.Control)
131+
}
132+
}
133+
t.Logf("TestSearch: %s -> num of entries = %d", searchRequest.Filter, len(entries))
134+
}
135+
136+
func TestSearchAsyncStop(t *testing.T) {
137+
l, err := DialURL(ldapServer)
138+
if err != nil {
139+
t.Fatal(err)
140+
}
141+
defer l.Close()
142+
143+
searchRequest := NewSearchRequest(
144+
baseDN,
145+
ScopeWholeSubtree, DerefAlways, 0, 0, false,
146+
filter[0],
147+
attributes,
148+
nil)
149+
150+
var entries []*Entry
151+
done := make(chan struct{})
152+
responses, err := l.SearchAsync(searchRequest, done)
153+
if err != nil {
154+
t.Fatal(err)
155+
}
156+
for res := range responses {
157+
if err := res.Err(); err != nil {
158+
t.Error(err)
159+
break
160+
}
161+
162+
if res.Closed() {
163+
break
164+
}
165+
close(done)
166+
switch res.Type {
167+
case SearchAsyncResponseTypeEntry:
168+
entries = append(entries, res.Entry)
169+
case SearchAsyncResponseTypeReferral:
170+
t.Logf("Received Referral: %s", res.Referral)
171+
case SearchAsyncResponseTypeControl:
172+
t.Logf("Received Control: %s", res.Control)
173+
}
174+
}
175+
if len(entries) > 1 {
176+
t.Errorf("Expected 1 entry, got %d", len(entries))
177+
}
178+
t.Logf("TestSearch: %s -> num of entries = %d", searchRequest.Filter, len(entries))
179+
}
180+
97181
func TestSearchStartTLS(t *testing.T) {
98182
l, err := DialURL(ldapServer)
99183
if err != nil {

0 commit comments

Comments
 (0)