diff options
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r-- | net/tipc/link.c | 140 |
1 files changed, 139 insertions, 1 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c index ad2c57f5868d..68d2afb44f2f 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -850,6 +850,144 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector) return res; } +/* tipc_link_cong: determine return value and how to treat the + * sent buffer during link congestion. + * - For plain, errorless user data messages we keep the buffer and + * return -ELINKONG. + * - For all other messages we discard the buffer and return -EHOSTUNREACH + * - For TIPC internal messages we also reset the link + */ +static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + uint psz = msg_size(msg); + uint imp = tipc_msg_tot_importance(msg); + u32 oport = msg_tot_origport(msg); + + if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) { + if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) { + link_schedule_port(link, oport, psz); + return -ELINKCONG; + } + } else { + pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); + tipc_link_reset(link); + } + kfree_skb_list(buf); + return -EHOSTUNREACH; +} + +/** + * __tipc_link_xmit2(): same as tipc_link_xmit2, but destlink is known & locked + * @link: link to use + * @buf: chain of buffers containing message + * Consumes the buffer chain, except when returning -ELINKCONG + * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket + * user data messages) or -EHOSTUNREACH (all other messages/senders) + * Only the socket functions tipc_send_stream() and tipc_send_packet() need + * to act on the return value, since they may need to do more send attempts. + */ +int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + uint psz = msg_size(msg); + uint qsz = link->out_queue_size; + uint sndlim = link->queue_limit[0]; + uint imp = tipc_msg_tot_importance(msg); + uint mtu = link->max_pkt; + uint ack = mod(link->next_in_no - 1); + uint seqno = link->next_out_no; + uint bc_last_in = link->owner->bclink.last_in; + struct tipc_media_addr *addr = &link->media_addr; + struct sk_buff *next = buf->next; + + /* Match queue limits against msg importance: */ + if (unlikely(qsz >= link->queue_limit[imp])) + return tipc_link_cong(link, buf); + + /* Has valid packet limit been used ? */ + if (unlikely(psz > mtu)) { + kfree_skb_list(buf); + return -EMSGSIZE; + } + + /* Prepare each packet for sending, and add to outqueue: */ + while (buf) { + next = buf->next; + msg = buf_msg(buf); + msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); + msg_set_bcast_ack(msg, bc_last_in); + + if (!link->first_out) { + link->first_out = buf; + } else if (qsz < sndlim) { + link->last_out->next = buf; + } else if (tipc_msg_bundle(link->last_out, buf, mtu)) { + link->stats.sent_bundled++; + buf = next; + next = buf->next; + continue; + } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) { + link->stats.sent_bundled++; + link->stats.sent_bundles++; + link->last_out->next = buf; + if (!link->next_out) + link->next_out = buf; + } else { + link->last_out->next = buf; + if (!link->next_out) + link->next_out = buf; + } + + /* Send packet if possible: */ + if (likely(++qsz <= sndlim)) { + tipc_bearer_send(link->bearer_id, buf, addr); + link->next_out = next; + link->unacked_window = 0; + } + seqno++; + link->last_out = buf; + buf = next; + } + link->next_out_no = seqno; + link->out_queue_size = qsz; + return 0; +} + +/** + * tipc_link_xmit2() is the general link level function for message sending + * @buf: chain of buffers containing message + * @dsz: amount of user data to be sent + * @dnode: address of destination node + * @selector: a number used for deterministic link selection + * Consumes the buffer chain, except when returning -ELINKCONG + * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE + */ +int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector) +{ + struct tipc_link *link = NULL; + struct tipc_node *node; + int rc = -EHOSTUNREACH; + + node = tipc_node_find(dnode); + if (node) { + tipc_node_lock(node); + link = node->active_links[selector & 1]; + if (link) + rc = __tipc_link_xmit2(link, buf); + tipc_node_unlock(node); + } + + if (link) + return rc; + + if (likely(in_own_node(dnode))) + return tipc_sk_rcv(buf); + + kfree_skb_list(buf); + return rc; +} + /* * tipc_link_sync_xmit - synchronize broadcast link endpoints. * @@ -1238,7 +1376,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr) tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); if (msg_user(msg) == MSG_BUNDLER) - msg_set_type(msg, CLOSED_MSG); + msg_set_type(msg, BUNDLE_CLOSED); l_ptr->next_out = buf->next; return 0; } |