diff options
author | David S. Miller <davem@davemloft.net> | 2015-07-30 17:25:15 -0700 |
---|---|---|
committer | David S. Miller <davem@davemloft.net> | 2015-07-30 17:25:15 -0700 |
commit | 1df33a11454de804661c8e19cd0e464914eefc6d (patch) | |
tree | 7d1135b020321fd3bd06f9bd6dbc1837380858f8 /net/tipc/node.c | |
parent | 29a3060aa7cc2e5cfaabeb935fafb832b9b33ad4 (diff) | |
parent | 440d8963cd590ec9387d76a36e60c02da9ed944d (diff) | |
download | talos-op-linux-1df33a11454de804661c8e19cd0e464914eefc6d.tar.gz talos-op-linux-1df33a11454de804661c8e19cd0e464914eefc6d.zip |
Merge branch 'tipc-next'
Jon Maloy says:
====================
tipc: separate link aggregation from link layer
We continue the work on separating the roles of the link aggregation and
link layers, as well as making code cleanups in general.
This second commit batch focuses on moving the orchestration of link
failover and synchronization to the node level, as well as preparing the
node lock structure for further future impovements. We also make some
changes to message delivery between link and socket layer, in order to
make this mechanism safer and less obscure.
====================
Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/node.c')
-rw-r--r-- | net/tipc/node.c | 663 |
1 files changed, 503 insertions, 160 deletions
diff --git a/net/tipc/node.c b/net/tipc/node.c index e92f84afbf95..7c191641b44f 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -42,7 +42,36 @@ #include "bcast.h" #include "discover.h" -static void node_lost_contact(struct tipc_node *n_ptr); +/* Node FSM states and events: + */ +enum { + SELF_DOWN_PEER_DOWN = 0xdd, + SELF_UP_PEER_UP = 0xaa, + SELF_DOWN_PEER_LEAVING = 0xd1, + SELF_UP_PEER_COMING = 0xac, + SELF_COMING_PEER_UP = 0xca, + SELF_LEAVING_PEER_DOWN = 0x1d, + NODE_FAILINGOVER = 0xf0, + NODE_SYNCHING = 0xcc +}; + +enum { + SELF_ESTABL_CONTACT_EVT = 0xece, + SELF_LOST_CONTACT_EVT = 0x1ce, + PEER_ESTABL_CONTACT_EVT = 0x9ece, + PEER_LOST_CONTACT_EVT = 0x91ce, + NODE_FAILOVER_BEGIN_EVT = 0xfbe, + NODE_FAILOVER_END_EVT = 0xfee, + NODE_SYNCH_BEGIN_EVT = 0xcbe, + NODE_SYNCH_END_EVT = 0xcee +}; + +static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, + struct sk_buff_head *xmitq, + struct tipc_media_addr **maddr); +static void tipc_node_link_down(struct tipc_node *n, int bearer_id, + bool delete); +static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq); static void node_established_contact(struct tipc_node *n_ptr); static void tipc_node_delete(struct tipc_node *node); static void tipc_node_timeout(unsigned long data); @@ -113,7 +142,7 @@ struct tipc_node *tipc_node_find(struct net *net, u32 addr) return NULL; } -struct tipc_node *tipc_node_create(struct net *net, u32 addr) +struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities) { struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_node *n_ptr, *temp_node; @@ -129,6 +158,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr) } n_ptr->addr = addr; n_ptr->net = net; + n_ptr->capabilities = capabilities; kref_init(&n_ptr->kref); spin_lock_init(&n_ptr->lock); INIT_HLIST_NODE(&n_ptr->hash); @@ -249,9 +279,8 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port) static void tipc_node_timeout(unsigned long data) { struct tipc_node *n = (struct tipc_node *)data; + struct tipc_link_entry *le; struct sk_buff_head xmitq; - struct tipc_link *l; - struct tipc_media_addr *maddr; int bearer_id; int rc = 0; @@ -259,17 +288,16 @@ static void tipc_node_timeout(unsigned long data) for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { tipc_node_lock(n); - l = n->links[bearer_id].link; - if (l) { + le = &n->links[bearer_id]; + if (le->link) { /* Link tolerance may change asynchronously: */ - tipc_node_calculate_timer(n, l); - rc = tipc_link_timeout(l, &xmitq); - if (rc & TIPC_LINK_DOWN_EVT) - tipc_link_reset(l); + tipc_node_calculate_timer(n, le->link); + rc = tipc_link_timeout(le->link, &xmitq); } tipc_node_unlock(n); - maddr = &n->links[bearer_id].maddr; - tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); + tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr); + if (rc & TIPC_LINK_DOWN_EVT) + tipc_node_link_down(n, bearer_id, false); } if (!mod_timer(&n->timer, jiffies + n->keepalive_intv)) tipc_node_get(n); @@ -277,68 +305,92 @@ static void tipc_node_timeout(unsigned long data) } /** - * tipc_node_link_up - handle addition of link - * + * __tipc_node_link_up - handle addition of link + * Node lock must be held by caller * Link becomes active (alone or shared) or standby, depending on its priority. */ -void tipc_node_link_up(struct tipc_node *n, int bearer_id) +static void __tipc_node_link_up(struct tipc_node *n, int bearer_id, + struct sk_buff_head *xmitq) { int *slot0 = &n->active_links[0]; int *slot1 = &n->active_links[1]; - struct tipc_link_entry *links = n->links; - struct tipc_link *l = n->links[bearer_id].link; + struct tipc_link *ol = node_active_link(n, 0); + struct tipc_link *nl = n->links[bearer_id].link; - /* Leave room for tunnel header when returning 'mtu' to users: */ - links[bearer_id].mtu = l->mtu - INT_H_SIZE; + if (!nl || !tipc_link_is_up(nl)) + return; n->working_links++; n->action_flags |= TIPC_NOTIFY_LINK_UP; - n->link_id = l->peer_bearer_id << 16 | l->bearer_id; + n->link_id = nl->peer_bearer_id << 16 | bearer_id; + + /* Leave room for tunnel header when returning 'mtu' to users: */ + n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE; + + tipc_bearer_add_dest(n->net, bearer_id, n->addr); pr_debug("Established link <%s> on network plane %c\n", - l->name, l->net_plane); + nl->name, nl->net_plane); - /* No active links ? => take both active slots */ - if (*slot0 < 0) { + /* First link? => give it both slots */ + if (!ol) { *slot0 = bearer_id; *slot1 = bearer_id; + tipc_link_build_bcast_sync_msg(nl, xmitq); node_established_contact(n); return; } - /* Lower prio than current active ? => no slot */ - if (l->priority < links[*slot0].link->priority) { - pr_debug("New link <%s> becomes standby\n", l->name); - return; - } - tipc_link_dup_queue_xmit(links[*slot0].link, l); - - /* Same prio as current active ? => take one slot */ - if (l->priority == links[*slot0].link->priority) { + /* Second link => redistribute slots */ + if (nl->priority > ol->priority) { + pr_debug("Old link <%s> becomes standby\n", ol->name); *slot0 = bearer_id; - return; + *slot1 = bearer_id; + } else if (nl->priority == ol->priority) { + *slot0 = bearer_id; + } else { + pr_debug("New link <%s> is standby\n", nl->name); } - /* Higher prio than current active => take both active slots */ - pr_debug("Old link <%s> now standby\n", links[*slot0].link->name); - *slot0 = bearer_id; - *slot1 = bearer_id; + /* Prepare synchronization with first link */ + tipc_link_tnl_prepare(ol, nl, SYNCH_MSG, xmitq); +} + +/** + * tipc_node_link_up - handle addition of link + * + * Link becomes active (alone or shared) or standby, depending on its priority. + */ +static void tipc_node_link_up(struct tipc_node *n, int bearer_id, + struct sk_buff_head *xmitq) +{ + tipc_node_lock(n); + __tipc_node_link_up(n, bearer_id, xmitq); + tipc_node_unlock(n); } /** - * tipc_node_link_down - handle loss of link + * __tipc_node_link_down - handle loss of link */ -void tipc_node_link_down(struct tipc_node *n, int bearer_id) +static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, + struct sk_buff_head *xmitq, + struct tipc_media_addr **maddr) { + struct tipc_link_entry *le = &n->links[*bearer_id]; int *slot0 = &n->active_links[0]; int *slot1 = &n->active_links[1]; int i, highest = 0; - struct tipc_link *l, *_l; + struct tipc_link *l, *_l, *tnl; + + l = n->links[*bearer_id].link; + if (!l || tipc_link_is_reset(l)) + return; - l = n->links[bearer_id].link; n->working_links--; n->action_flags |= TIPC_NOTIFY_LINK_DOWN; - n->link_id = l->peer_bearer_id << 16 | l->bearer_id; + n->link_id = l->peer_bearer_id << 16 | *bearer_id; + + tipc_bearer_remove_dest(n->net, *bearer_id, n->addr); pr_debug("Lost link <%s> on network plane %c\n", l->name, l->net_plane); @@ -350,6 +402,8 @@ void tipc_node_link_down(struct tipc_node *n, int bearer_id) _l = n->links[i].link; if (!_l || !tipc_link_is_up(_l)) continue; + if (_l == l) + continue; if (_l->priority < highest) continue; if (_l->priority > highest) { @@ -360,10 +414,43 @@ void tipc_node_link_down(struct tipc_node *n, int bearer_id) } *slot1 = i; } - if (tipc_node_is_up(n)) - tipc_link_failover_send_queue(l); - else - node_lost_contact(n); + + if (!tipc_node_is_up(n)) { + tipc_link_reset(l); + node_lost_contact(n, &le->inputq); + return; + } + + /* There is still a working link => initiate failover */ + tnl = node_active_link(n, 0); + n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1); + tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq); + tipc_link_reset(l); + tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); + tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT); + *maddr = &n->links[tnl->bearer_id].maddr; + *bearer_id = tnl->bearer_id; +} + +static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete) +{ + struct tipc_link_entry *le = &n->links[bearer_id]; + struct tipc_media_addr *maddr; + struct sk_buff_head xmitq; + + __skb_queue_head_init(&xmitq); + + tipc_node_lock(n); + __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); + if (delete && le->link) { + kfree(le->link); + le->link = NULL; + n->link_cnt--; + } + tipc_node_unlock(n); + + tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); + tipc_sk_rcv(n->net, &le->inputq); } bool tipc_node_is_up(struct tipc_node *n) @@ -371,55 +458,150 @@ bool tipc_node_is_up(struct tipc_node *n) return n->active_links[0] != INVALID_BEARER_ID; } -void tipc_node_check_dest(struct tipc_node *n, struct tipc_bearer *b, - bool *link_up, bool *addr_match, - struct tipc_media_addr *maddr) +void tipc_node_check_dest(struct net *net, u32 onode, + struct tipc_bearer *b, + u16 capabilities, u32 signature, + struct tipc_media_addr *maddr, + bool *respond, bool *dupl_addr) { - struct tipc_link *l = n->links[b->identity].link; - struct tipc_media_addr *curr = &n->links[b->identity].maddr; + struct tipc_node *n; + struct tipc_link *l; + struct tipc_link_entry *le; + bool addr_match = false; + bool sign_match = false; + bool link_up = false; + bool accept_addr = false; + bool reset = true; + + *dupl_addr = false; + *respond = false; + + n = tipc_node_create(net, onode, capabilities); + if (!n) + return; - *link_up = l && tipc_link_is_up(l); - *addr_match = l && !memcmp(curr, maddr, sizeof(*maddr)); -} + tipc_node_lock(n); -bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *b, - struct tipc_media_addr *maddr) -{ - struct tipc_link *l = n->links[b->identity].link; - struct tipc_media_addr *curr = &n->links[b->identity].maddr; - struct sk_buff_head *inputq = &n->links[b->identity].inputq; + le = &n->links[b->identity]; + + /* Prepare to validate requesting node's signature and media address */ + l = le->link; + link_up = l && tipc_link_is_up(l); + addr_match = l && !memcmp(&le->maddr, maddr, sizeof(*maddr)); + sign_match = (signature == n->signature); + + /* These three flags give us eight permutations: */ + + if (sign_match && addr_match && link_up) { + /* All is fine. Do nothing. */ + reset = false; + } else if (sign_match && addr_match && !link_up) { + /* Respond. The link will come up in due time */ + *respond = true; + } else if (sign_match && !addr_match && link_up) { + /* Peer has changed i/f address without rebooting. + * If so, the link will reset soon, and the next + * discovery will be accepted. So we can ignore it. + * It may also be an cloned or malicious peer having + * chosen the same node address and signature as an + * existing one. + * Ignore requests until the link goes down, if ever. + */ + *dupl_addr = true; + } else if (sign_match && !addr_match && !link_up) { + /* Peer link has changed i/f address without rebooting. + * It may also be a cloned or malicious peer; we can't + * distinguish between the two. + * The signature is correct, so we must accept. + */ + accept_addr = true; + *respond = true; + } else if (!sign_match && addr_match && link_up) { + /* Peer node rebooted. Two possibilities: + * - Delayed re-discovery; this link endpoint has already + * reset and re-established contact with the peer, before + * receiving a discovery message from that node. + * (The peer happened to receive one from this node first). + * - The peer came back so fast that our side has not + * discovered it yet. Probing from this side will soon + * reset the link, since there can be no working link + * endpoint at the peer end, and the link will re-establish. + * Accept the signature, since it comes from a known peer. + */ + n->signature = signature; + } else if (!sign_match && addr_match && !link_up) { + /* The peer node has rebooted. + * Accept signature, since it is a known peer. + */ + n->signature = signature; + *respond = true; + } else if (!sign_match && !addr_match && link_up) { + /* Peer rebooted with new address, or a new/duplicate peer. + * Ignore until the link goes down, if ever. + */ + *dupl_addr = true; + } else if (!sign_match && !addr_match && !link_up) { + /* Peer rebooted with new address, or it is a new peer. + * Accept signature and address. + */ + n->signature = signature; + accept_addr = true; + *respond = true; + } + + if (!accept_addr) + goto exit; + /* Now create new link if not already existing */ if (!l) { - l = tipc_link_create(n, b, maddr, inputq, &n->bclink.namedq); - if (!l) - return false; + if (n->link_cnt == 2) { + pr_warn("Cannot establish 3rd link to %x\n", n->addr); + goto exit; + } + if (!tipc_link_create(n, b, mod(tipc_net(net)->random), + tipc_own_addr(net), onode, &le->maddr, + &le->inputq, &n->bclink.namedq, &l)) { + *respond = false; + goto exit; + } + tipc_link_reset(l); + le->link = l; + n->link_cnt++; tipc_node_calculate_timer(n, l); - if (n->link_cnt == 1) { + if (n->link_cnt == 1) if (!mod_timer(&n->timer, jiffies + n->keepalive_intv)) tipc_node_get(n); - } } - memcpy(&l->media_addr, maddr, sizeof(*maddr)); - memcpy(curr, maddr, sizeof(*maddr)); - tipc_link_reset(l); - return true; + memcpy(&le->maddr, maddr, sizeof(*maddr)); +exit: + tipc_node_unlock(n); + if (reset) + tipc_node_link_down(n, b->identity, false); + tipc_node_put(n); } -void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) +void tipc_node_delete_links(struct net *net, int bearer_id) { - n_ptr->links[l_ptr->bearer_id].link = l_ptr; - n_ptr->link_cnt++; + struct tipc_net *tn = net_generic(net, tipc_net_id); + struct tipc_node *n; + + rcu_read_lock(); + list_for_each_entry_rcu(n, &tn->node_list, list) { + tipc_node_link_down(n, bearer_id, true); + } + rcu_read_unlock(); } -void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) +static void tipc_node_reset_links(struct tipc_node *n) { + char addr_string[16]; int i; + pr_warn("Resetting all links to %s\n", + tipc_addr_string_fill(addr_string, n->addr)); + for (i = 0; i < MAX_BEARERS; i++) { - if (l_ptr != n_ptr->links[i].link) - continue; - n_ptr->links[i].link = NULL; - n_ptr->link_cnt--; + tipc_node_link_down(n, i, false); } } @@ -442,8 +624,12 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt) case SELF_LOST_CONTACT_EVT: case PEER_LOST_CONTACT_EVT: break; + case NODE_SYNCH_END_EVT: + case NODE_SYNCH_BEGIN_EVT: + case NODE_FAILOVER_BEGIN_EVT: + case NODE_FAILOVER_END_EVT: default: - pr_err("Unknown node fsm evt %x/%x\n", state, evt); + goto illegal_evt; } break; case SELF_UP_PEER_UP: @@ -454,11 +640,19 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt) case PEER_LOST_CONTACT_EVT: state = SELF_LEAVING_PEER_DOWN; break; + case NODE_SYNCH_BEGIN_EVT: + state = NODE_SYNCHING; + break; + case NODE_FAILOVER_BEGIN_EVT: + state = NODE_FAILINGOVER; + break; case SELF_ESTABL_CONTACT_EVT: case PEER_ESTABL_CONTACT_EVT: + case NODE_SYNCH_END_EVT: + case NODE_FAILOVER_END_EVT: break; default: - pr_err("Unknown node fsm evt %x/%x\n", state, evt); + goto illegal_evt; } break; case SELF_DOWN_PEER_LEAVING: @@ -470,8 +664,12 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt) case PEER_ESTABL_CONTACT_EVT: case SELF_LOST_CONTACT_EVT: break; + case NODE_SYNCH_END_EVT: + case NODE_SYNCH_BEGIN_EVT: + case NODE_FAILOVER_BEGIN_EVT: + case NODE_FAILOVER_END_EVT: default: - pr_err("Unknown node fsm evt %x/%x\n", state, evt); + goto illegal_evt; } break; case SELF_UP_PEER_COMING: @@ -485,8 +683,12 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt) case SELF_ESTABL_CONTACT_EVT: case PEER_LOST_CONTACT_EVT: break; + case NODE_SYNCH_END_EVT: + case NODE_SYNCH_BEGIN_EVT: + case NODE_FAILOVER_BEGIN_EVT: + case NODE_FAILOVER_END_EVT: default: - pr_err("Unknown node fsm evt %x/%x\n", state, evt); + goto illegal_evt; } break; case SELF_COMING_PEER_UP: @@ -500,8 +702,12 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt) case SELF_LOST_CONTACT_EVT: case PEER_ESTABL_CONTACT_EVT: break; + case NODE_SYNCH_END_EVT: + case NODE_SYNCH_BEGIN_EVT: + case NODE_FAILOVER_BEGIN_EVT: + case NODE_FAILOVER_END_EVT: default: - pr_err("Unknown node fsm evt %x/%x\n", state, evt); + goto illegal_evt; } break; case SELF_LEAVING_PEER_DOWN: @@ -513,49 +719,85 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt) case PEER_ESTABL_CONTACT_EVT: case PEER_LOST_CONTACT_EVT: break; + case NODE_SYNCH_END_EVT: + case NODE_SYNCH_BEGIN_EVT: + case NODE_FAILOVER_BEGIN_EVT: + case NODE_FAILOVER_END_EVT: + default: + goto illegal_evt; + } + break; + case NODE_FAILINGOVER: + switch (evt) { + case SELF_LOST_CONTACT_EVT: + state = SELF_DOWN_PEER_LEAVING; + break; + case PEER_LOST_CONTACT_EVT: + state = SELF_LEAVING_PEER_DOWN; + break; + case NODE_FAILOVER_END_EVT: + state = SELF_UP_PEER_UP; + break; + case NODE_FAILOVER_BEGIN_EVT: + case SELF_ESTABL_CONTACT_EVT: + case PEER_ESTABL_CONTACT_EVT: + break; + case NODE_SYNCH_BEGIN_EVT: + case NODE_SYNCH_END_EVT: default: - pr_err("Unknown node fsm evt %x/%x\n", state, evt); + goto illegal_evt; + } + break; + case NODE_SYNCHING: + switch (evt) { + case SELF_LOST_CONTACT_EVT: + state = SELF_DOWN_PEER_LEAVING; + break; + case PEER_LOST_CONTACT_EVT: + state = SELF_LEAVING_PEER_DOWN; + break; + case NODE_SYNCH_END_EVT: + state = SELF_UP_PEER_UP; + break; + case NODE_FAILOVER_BEGIN_EVT: + state = NODE_FAILINGOVER; + break; + case NODE_SYNCH_BEGIN_EVT: + case SELF_ESTABL_CONTACT_EVT: + case PEER_ESTABL_CONTACT_EVT: + break; + case NODE_FAILOVER_END_EVT: + default: + goto illegal_evt; } break; default: pr_err("Unknown node fsm state %x\n", state); break; } - n->state = state; + return; + +illegal_evt: + pr_err("Illegal node fsm evt %x in state %x\n", evt, state); } -bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_link *l, - struct tipc_msg *hdr) +bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr) { int state = n->state; if (likely(state == SELF_UP_PEER_UP)) return true; - if (state == SELF_DOWN_PEER_DOWN) - return true; - - if (state == SELF_UP_PEER_COMING) { - /* If not traffic msg, peer may still be ESTABLISHING */ - if (tipc_link_is_up(l) && msg_is_traffic(hdr)) - tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT); - return true; - } - - if (state == SELF_COMING_PEER_UP) - return true; - if (state == SELF_LEAVING_PEER_DOWN) return false; if (state == SELF_DOWN_PEER_LEAVING) { - if (msg_peer_is_up(hdr)) + if (msg_peer_node_is_up(hdr)) return false; - tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT); - return true; } - return false; + + return true; } static void node_established_contact(struct tipc_node *n_ptr) @@ -567,10 +809,12 @@ static void node_established_contact(struct tipc_node *n_ptr) tipc_bclink_add_node(n_ptr->net, n_ptr->addr); } -static void node_lost_contact(struct tipc_node *n_ptr) +static void node_lost_contact(struct tipc_node *n_ptr, + struct sk_buff_head *inputq) { char addr_string[16]; struct tipc_sock_conn *conn, *safe; + struct tipc_link *l; struct list_head *conns = &n_ptr->conn_sks; struct sk_buff *skb; struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id); @@ -596,16 +840,11 @@ static void node_lost_contact(struct tipc_node *n_ptr) /* Abort any ongoing link failover */ for (i = 0; i < MAX_BEARERS; i++) { - struct tipc_link *l_ptr = n_ptr->links[i].link; - if (!l_ptr) - continue; - l_ptr->exec_mode = TIPC_LINK_OPEN; - l_ptr->failover_checkpt = 0; - l_ptr->failover_pkts = 0; - kfree_skb(l_ptr->failover_skb); - l_ptr->failover_skb = NULL; - tipc_link_reset_fragments(l_ptr); + l = n_ptr->links[i].link; + if (l) + tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT); } + /* Prevent re-contact with node until cleanup is done */ tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT); @@ -618,10 +857,8 @@ static void node_lost_contact(struct tipc_node *n_ptr) SHORT_H_SIZE, 0, tn->own_addr, conn->peer_node, conn->port, conn->peer_port, TIPC_ERR_NO_NODE); - if (likely(skb)) { - skb_queue_tail(n_ptr->inputq, skb); - n_ptr->action_flags |= TIPC_MSG_EVT; - } + if (likely(skb)) + skb_queue_tail(inputq, skb); list_del(&conn->list); kfree(conn); } @@ -668,27 +905,20 @@ void tipc_node_unlock(struct tipc_node *node) u32 flags = node->action_flags; u32 link_id = 0; struct list_head *publ_list; - struct sk_buff_head *inputq = node->inputq; - struct sk_buff_head *namedq; - if (likely(!flags || (flags == TIPC_MSG_EVT))) { - node->action_flags = 0; + if (likely(!flags)) { spin_unlock_bh(&node->lock); - if (flags == TIPC_MSG_EVT) - tipc_sk_rcv(net, inputq); return; } addr = node->addr; link_id = node->link_id; - namedq = node->namedq; publ_list = &node->publ_list; - node->action_flags &= ~(TIPC_MSG_EVT | - TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP | + node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP | TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT | - TIPC_NAMED_MSG_EVT | TIPC_BCAST_RESET); + TIPC_BCAST_RESET); spin_unlock_bh(&node->lock); @@ -709,17 +939,11 @@ void tipc_node_unlock(struct tipc_node *node) tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, link_id, addr); - if (flags & TIPC_MSG_EVT) - tipc_sk_rcv(net, inputq); - - if (flags & TIPC_NAMED_MSG_EVT) - tipc_named_rcv(net, namedq); - if (flags & TIPC_BCAST_MSG_EVT) tipc_bclink_input(net); if (flags & TIPC_BCAST_RESET) - tipc_link_reset_all(node); + tipc_node_reset_links(node); } /* Caller should hold node lock for the passed node */ @@ -796,9 +1020,9 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list, l = tipc_node_select_link(n, selector, &bearer_id, &maddr); if (likely(l)) rc = tipc_link_xmit(l, list, &xmitq); - if (unlikely(rc == -ENOBUFS)) - tipc_link_reset(l); tipc_node_unlock(n); + if (unlikely(rc == -ENOBUFS)) + tipc_node_link_down(n, bearer_id, false); tipc_node_put(n); } if (likely(!rc)) { @@ -835,6 +1059,120 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, } /** + * tipc_node_check_state - check and if necessary update node state + * @skb: TIPC packet + * @bearer_id: identity of bearer delivering the packet + * Returns true if state is ok, otherwise consumes buffer and returns false + */ +static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, + int bearer_id, struct sk_buff_head *xmitq) +{ + struct tipc_msg *hdr = buf_msg(skb); + int usr = msg_user(hdr); + int mtyp = msg_type(hdr); + u16 oseqno = msg_seqno(hdr); + u16 iseqno = msg_seqno(msg_get_wrapped(hdr)); + u16 exp_pkts = msg_msgcnt(hdr); + u16 rcv_nxt, syncpt, dlv_nxt; + int state = n->state; + struct tipc_link *l, *pl = NULL; + struct tipc_media_addr *maddr; + int i, pb_id; + + l = n->links[bearer_id].link; + if (!l) + return false; + rcv_nxt = l->rcv_nxt; + + + if (likely((state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) + return true; + + /* Find parallel link, if any */ + for (i = 0; i < MAX_BEARERS; i++) { + if ((i != bearer_id) && n->links[i].link) { + pl = n->links[i].link; + break; + } + } + + /* Update node accesibility if applicable */ + if (state == SELF_UP_PEER_COMING) { + if (!tipc_link_is_up(l)) + return true; + if (!msg_peer_link_is_up(hdr)) + return true; + tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT); + } + + if (state == SELF_DOWN_PEER_LEAVING) { + if (msg_peer_node_is_up(hdr)) + return false; + tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT); + } + + /* Ignore duplicate packets */ + if (less(oseqno, rcv_nxt)) + return true; + + /* Initiate or update failover mode if applicable */ + if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) { + syncpt = oseqno + exp_pkts - 1; + if (pl && tipc_link_is_up(pl)) { + pb_id = pl->bearer_id; + __tipc_node_link_down(n, &pb_id, xmitq, &maddr); + tipc_skb_queue_splice_tail_init(pl->inputq, l->inputq); + } + /* If pkts arrive out of order, use lowest calculated syncpt */ + if (less(syncpt, n->sync_point)) + n->sync_point = syncpt; + } + + /* Open parallel link when tunnel link reaches synch point */ + if ((n->state == NODE_FAILINGOVER) && !tipc_link_is_failingover(l)) { + if (!more(rcv_nxt, n->sync_point)) + return true; + tipc_node_fsm_evt(n, NODE_FAILOVER_END_EVT); + if (pl) + tipc_link_fsm_evt(pl, LINK_FAILOVER_END_EVT); + return true; + } + + /* Initiate or update synch mode if applicable */ + if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) { + syncpt = iseqno + exp_pkts - 1; + if (!tipc_link_is_up(l)) { + tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); + __tipc_node_link_up(n, bearer_id, xmitq); + } + if (n->state == SELF_UP_PEER_UP) { + n->sync_point = syncpt; + tipc_link_fsm_evt(l, LINK_SYNCH_BEGIN_EVT); + tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT); + } + if (less(syncpt, n->sync_point)) + n->sync_point = syncpt; + } + + /* Open tunnel link when parallel link reaches synch point */ + if ((n->state == NODE_SYNCHING) && tipc_link_is_synching(l)) { + if (pl) + dlv_nxt = mod(pl->rcv_nxt - skb_queue_len(pl->inputq)); + if (!pl || more(dlv_nxt, n->sync_point)) { + tipc_link_fsm_evt(l, LINK_SYNCH_END_EVT); + tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT); + return true; + } + if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) + return true; + if (usr == LINK_PROTOCOL) + return true; + return false; + } + return true; +} + +/** * tipc_rcv - process TIPC packets/messages arriving from off-node * @net: the applicable net namespace * @skb: TIPC packet @@ -847,10 +1185,10 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) { struct sk_buff_head xmitq; struct tipc_node *n; - struct tipc_link *l; - struct tipc_msg *hdr; - struct tipc_media_addr *maddr; + struct tipc_msg *hdr = buf_msg(skb); + int usr = msg_user(hdr); int bearer_id = b->identity; + struct tipc_link_entry *le; int rc = 0; __skb_queue_head_init(&xmitq); @@ -860,9 +1198,8 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) goto discard; /* Handle arrival of a non-unicast link packet */ - hdr = buf_msg(skb); if (unlikely(msg_non_seq(hdr))) { - if (msg_user(hdr) == LINK_CONFIG) + if (usr == LINK_CONFIG) tipc_disc_rcv(net, skb, b); else tipc_bclink_rcv(net, skb); @@ -873,38 +1210,44 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) n = tipc_node_find(net, msg_prevnode(hdr)); if (unlikely(!n)) goto discard; + le = &n->links[bearer_id]; + tipc_node_lock(n); - /* Locate link endpoint that should handle packet */ - l = n->links[bearer_id].link; - if (unlikely(!l)) + /* Is reception permitted at the moment ? */ + if (!tipc_node_filter_pkt(n, hdr)) goto unlock; - /* Is reception of this packet permitted at the moment ? */ - if (unlikely(n->state != SELF_UP_PEER_UP)) - if (!tipc_node_filter_skb(n, l, hdr)) - goto unlock; - if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) tipc_bclink_sync_state(n, hdr); - /* Release acked broadcast messages */ + /* Release acked broadcast packets */ if (unlikely(n->bclink.acked != msg_bcast_ack(hdr))) tipc_bclink_acknowledge(n, msg_bcast_ack(hdr)); - /* Check protocol and update link state */ - rc = tipc_link_rcv(l, skb, &xmitq); + /* Check and if necessary update node state */ + if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) { + rc = tipc_link_rcv(le->link, skb, &xmitq); + skb = NULL; + } +unlock: + tipc_node_unlock(n); if (unlikely(rc & TIPC_LINK_UP_EVT)) - tipc_link_activate(l); + tipc_node_link_up(n, bearer_id, &xmitq); + if (unlikely(rc & TIPC_LINK_DOWN_EVT)) - tipc_link_reset(l); - skb = NULL; -unlock: - tipc_node_unlock(n); - tipc_sk_rcv(net, &n->links[bearer_id].inputq); - maddr = &n->links[bearer_id].maddr; - tipc_bearer_xmit(net, bearer_id, &xmitq, maddr); + tipc_node_link_down(n, bearer_id, false); + + if (unlikely(!skb_queue_empty(&n->bclink.namedq))) + tipc_named_rcv(net, &n->bclink.namedq); + + if (!skb_queue_empty(&le->inputq)) + tipc_sk_rcv(net, &le->inputq); + + if (!skb_queue_empty(&xmitq)) + tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); + tipc_node_put(n); discard: kfree_skb(skb); |