Skip to content

Commit af9b028

Browse files
Jon Paul Maloydavem330
Jon Paul Maloy
authored andcommitted
tipc: make media xmit call outside node spinlock context
Currently, message sending is performed through a deep call chain, where the node spinlock is grabbed and held during a significant part of the transmission time. This is clearly detrimental to overall throughput performance; it would be better if we could send the message after the spinlock has been released. In this commit, we do instead let the call revert on the stack after the buffer chain has been added to the transmission queue, whereafter clones of the buffers are transmitted to the device layer outside the spinlock scope. As a further step in our effort to separate the roles of the node and link entities we also move the function tipc_link_xmit() to node.c, and rename it to tipc_node_xmit(). Reviewed-by: Ying Xue <[email protected]> Signed-off-by: Jon Maloy <[email protected]> Signed-off-by: David S. Miller <[email protected]>
1 parent 22d85c7 commit af9b028

File tree

8 files changed

+198
-77
lines changed

8 files changed

+198
-77
lines changed

net/tipc/bearer.c

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,32 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
470470
rcu_read_unlock();
471471
}
472472

473+
/* tipc_bearer_xmit() -send buffer to destination over bearer
474+
*/
475+
void tipc_bearer_xmit(struct net *net, u32 bearer_id,
476+
struct sk_buff_head *xmitq,
477+
struct tipc_media_addr *dst)
478+
{
479+
struct tipc_net *tn = net_generic(net, tipc_net_id);
480+
struct tipc_bearer *b;
481+
struct sk_buff *skb, *tmp;
482+
483+
if (skb_queue_empty(xmitq))
484+
return;
485+
486+
rcu_read_lock();
487+
b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
488+
if (likely(b)) {
489+
skb_queue_walk_safe(xmitq, skb, tmp) {
490+
__skb_dequeue(xmitq);
491+
b->media->send_msg(net, skb, b, dst);
492+
/* Until we remove cloning in tipc_l2_send_msg(): */
493+
kfree_skb(skb);
494+
}
495+
}
496+
rcu_read_unlock();
497+
}
498+
473499
/**
474500
* tipc_l2_rcv_msg - handle incoming TIPC message from an interface
475501
* @buf: the received packet

net/tipc/bearer.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,5 +217,8 @@ void tipc_bearer_cleanup(void);
217217
void tipc_bearer_stop(struct net *net);
218218
void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
219219
struct tipc_media_addr *dest);
220+
void tipc_bearer_xmit(struct net *net, u32 bearer_id,
221+
struct sk_buff_head *xmitq,
222+
struct tipc_media_addr *dst);
220223

221224
#endif /* _TIPC_BEARER_H */

net/tipc/link.c

Lines changed: 72 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,6 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
353353
/* This really cannot happen... */
354354
if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
355355
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
356-
tipc_link_reset(link);
357356
return -ENOBUFS;
358357
}
359358
/* Non-blocking sender: */
@@ -701,6 +700,78 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
701700
return 0;
702701
}
703702

703+
/**
704+
* tipc_link_xmit(): enqueue buffer list according to queue situation
705+
* @link: link to use
706+
* @list: chain of buffers containing message
707+
* @xmitq: returned list of packets to be sent by caller
708+
*
709+
* Consumes the buffer chain, except when returning -ELINKCONG,
710+
* since the caller then may want to make more send attempts.
711+
* Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
712+
* Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
713+
*/
714+
int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
715+
struct sk_buff_head *xmitq)
716+
{
717+
struct tipc_msg *hdr = buf_msg(skb_peek(list));
718+
unsigned int maxwin = l->window;
719+
unsigned int i, imp = msg_importance(hdr);
720+
unsigned int mtu = l->mtu;
721+
u16 ack = l->rcv_nxt - 1;
722+
u16 seqno = l->snd_nxt;
723+
u16 bc_last_in = l->owner->bclink.last_in;
724+
struct sk_buff_head *transmq = &l->transmq;
725+
struct sk_buff_head *backlogq = &l->backlogq;
726+
struct sk_buff *skb, *_skb, *bskb;
727+
728+
/* Match msg importance against this and all higher backlog limits: */
729+
for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
730+
if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
731+
return link_schedule_user(l, list);
732+
}
733+
if (unlikely(msg_size(hdr) > mtu))
734+
return -EMSGSIZE;
735+
736+
/* Prepare each packet for sending, and add to relevant queue: */
737+
while (skb_queue_len(list)) {
738+
skb = skb_peek(list);
739+
hdr = buf_msg(skb);
740+
msg_set_seqno(hdr, seqno);
741+
msg_set_ack(hdr, ack);
742+
msg_set_bcast_ack(hdr, bc_last_in);
743+
744+
if (likely(skb_queue_len(transmq) < maxwin)) {
745+
_skb = skb_clone(skb, GFP_ATOMIC);
746+
if (!_skb)
747+
return -ENOBUFS;
748+
__skb_dequeue(list);
749+
__skb_queue_tail(transmq, skb);
750+
__skb_queue_tail(xmitq, _skb);
751+
l->rcv_unacked = 0;
752+
seqno++;
753+
continue;
754+
}
755+
if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) {
756+
kfree_skb(__skb_dequeue(list));
757+
l->stats.sent_bundled++;
758+
continue;
759+
}
760+
if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) {
761+
kfree_skb(__skb_dequeue(list));
762+
__skb_queue_tail(backlogq, bskb);
763+
l->backlog[msg_importance(buf_msg(bskb))].len++;
764+
l->stats.sent_bundled++;
765+
l->stats.sent_bundles++;
766+
continue;
767+
}
768+
l->backlog[imp].len += skb_queue_len(list);
769+
skb_queue_splice_tail_init(list, backlogq);
770+
}
771+
l->snd_nxt = seqno;
772+
return 0;
773+
}
774+
704775
static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
705776
{
706777
skb_queue_head_init(list);
@@ -715,65 +786,6 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
715786
return __tipc_link_xmit(link->owner->net, link, &head);
716787
}
717788

718-
/* tipc_link_xmit_skb(): send single buffer to destination
719-
* Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
720-
* messages, which will not cause link congestion
721-
* The only exception is datagram messages rerouted after secondary
722-
* lookup, which are rare and safe to dispose of anyway.
723-
* TODO: Return real return value, and let callers use
724-
* tipc_wait_for_sendpkt() where applicable
725-
*/
726-
int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
727-
u32 selector)
728-
{
729-
struct sk_buff_head head;
730-
int rc;
731-
732-
skb2list(skb, &head);
733-
rc = tipc_link_xmit(net, &head, dnode, selector);
734-
if (rc)
735-
kfree_skb(skb);
736-
return 0;
737-
}
738-
739-
/**
740-
* tipc_link_xmit() is the general link level function for message sending
741-
* @net: the applicable net namespace
742-
* @list: chain of buffers containing message
743-
* @dsz: amount of user data to be sent
744-
* @dnode: address of destination node
745-
* @selector: a number used for deterministic link selection
746-
* Consumes the buffer chain, except when returning error
747-
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
748-
*/
749-
int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
750-
u32 selector)
751-
{
752-
struct tipc_link *link = NULL;
753-
struct tipc_node *node;
754-
int rc = -EHOSTUNREACH;
755-
756-
node = tipc_node_find(net, dnode);
757-
if (node) {
758-
tipc_node_lock(node);
759-
link = node_active_link(node, selector & 1);
760-
if (link)
761-
rc = __tipc_link_xmit(net, link, list);
762-
tipc_node_unlock(node);
763-
tipc_node_put(node);
764-
}
765-
if (link)
766-
return rc;
767-
768-
if (likely(in_own_node(net, dnode))) {
769-
tipc_sk_rcv(net, list);
770-
return 0;
771-
}
772-
773-
__skb_queue_purge(list);
774-
return rc;
775-
}
776-
777789
/*
778790
* tipc_link_sync_xmit - synchronize broadcast link endpoints.
779791
*

net/tipc/link.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,12 +223,10 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr);
223223
void tipc_link_purge_backlog(struct tipc_link *l);
224224
void tipc_link_reset_all(struct tipc_node *node);
225225
void tipc_link_reset(struct tipc_link *l_ptr);
226-
int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
227-
u32 selector);
228-
int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest,
229-
u32 selector);
230226
int __tipc_link_xmit(struct net *net, struct tipc_link *link,
231227
struct sk_buff_head *list);
228+
int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list,
229+
struct sk_buff_head *xmitq);
232230
void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
233231
u32 gap, u32 tolerance, u32 priority);
234232
void tipc_link_push_packets(struct tipc_link *l_ptr);

net/tipc/name_distr.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb)
102102
if (!oskb)
103103
break;
104104
msg_set_destnode(buf_msg(oskb), dnode);
105-
tipc_link_xmit_skb(net, oskb, dnode, dnode);
105+
tipc_node_xmit_skb(net, oskb, dnode, dnode);
106106
}
107107
rcu_read_unlock();
108108

@@ -223,7 +223,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)
223223
&tn->nametbl->publ_list[TIPC_ZONE_SCOPE]);
224224
rcu_read_unlock();
225225

226-
tipc_link_xmit(net, &head, dnode, dnode);
226+
tipc_node_xmit(net, &head, dnode, dnode);
227227
}
228228

229229
static void tipc_publ_subscribe(struct net *net, struct publication *publ,

net/tipc/node.c

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,84 @@ static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
563563
return -EMSGSIZE;
564564
}
565565

566+
static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
567+
int *bearer_id,
568+
struct tipc_media_addr **maddr)
569+
{
570+
int id = n->active_links[sel & 1];
571+
572+
if (unlikely(id < 0))
573+
return NULL;
574+
575+
*bearer_id = id;
576+
*maddr = &n->links[id].maddr;
577+
return n->links[id].link;
578+
}
579+
580+
/**
581+
* tipc_node_xmit() is the general link level function for message sending
582+
* @net: the applicable net namespace
583+
* @list: chain of buffers containing message
584+
* @dnode: address of destination node
585+
* @selector: a number used for deterministic link selection
586+
* Consumes the buffer chain, except when returning -ELINKCONG
587+
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
588+
*/
589+
int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
590+
u32 dnode, int selector)
591+
{
592+
struct tipc_link *l = NULL;
593+
struct tipc_node *n;
594+
struct sk_buff_head xmitq;
595+
struct tipc_media_addr *maddr;
596+
int bearer_id;
597+
int rc = -EHOSTUNREACH;
598+
599+
__skb_queue_head_init(&xmitq);
600+
n = tipc_node_find(net, dnode);
601+
if (likely(n)) {
602+
tipc_node_lock(n);
603+
l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
604+
if (likely(l))
605+
rc = tipc_link_xmit(l, list, &xmitq);
606+
if (unlikely(rc == -ENOBUFS))
607+
tipc_link_reset(l);
608+
tipc_node_unlock(n);
609+
tipc_node_put(n);
610+
}
611+
if (likely(!rc)) {
612+
tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
613+
return 0;
614+
}
615+
if (likely(in_own_node(net, dnode))) {
616+
tipc_sk_rcv(net, list);
617+
return 0;
618+
}
619+
return rc;
620+
}
621+
622+
/* tipc_node_xmit_skb(): send single buffer to destination
623+
* Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
624+
* messages, which will not be rejected
625+
* The only exception is datagram messages rerouted after secondary
626+
* lookup, which are rare and safe to dispose of anyway.
627+
* TODO: Return real return value, and let callers use
628+
* tipc_wait_for_sendpkt() where applicable
629+
*/
630+
int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
631+
u32 selector)
632+
{
633+
struct sk_buff_head head;
634+
int rc;
635+
636+
skb_queue_head_init(&head);
637+
__skb_queue_tail(&head, skb);
638+
rc = tipc_node_xmit(net, &head, dnode, selector);
639+
if (rc == -ELINKCONG)
640+
kfree_skb(skb);
641+
return 0;
642+
}
643+
566644
int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
567645
{
568646
int err;

net/tipc/node.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ bool tipc_node_is_up(struct tipc_node *n);
160160
int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
161161
char *linkname, size_t len);
162162
void tipc_node_unlock(struct tipc_node *node);
163+
int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
164+
int selector);
165+
int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
166+
u32 selector);
163167
int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
164168
void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
165169

0 commit comments

Comments
 (0)