diff options
Diffstat (limited to 'net/tipc/link.c')
| -rw-r--r-- | net/tipc/link.c | 247 | 
1 files changed, 104 insertions, 143 deletions
| diff --git a/net/tipc/link.c b/net/tipc/link.c index 41cb09aa41de..942491234099 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -113,10 +113,8 @@ static void link_reset_statistics(struct tipc_link *l_ptr);  static void link_print(struct tipc_link *l_ptr, const char *str);  static void tipc_link_sync_xmit(struct tipc_link *l);  static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); -static int tipc_link_input(struct net *net, struct tipc_link *l, -			   struct sk_buff *buf); -static int tipc_link_prepare_input(struct net *net, struct tipc_link *l, -				   struct sk_buff **buf); +static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb); +static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);  /*   *  Simple link routines @@ -318,8 +316,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,  	l_ptr->next_out_no = 1;  	__skb_queue_head_init(&l_ptr->outqueue);  	__skb_queue_head_init(&l_ptr->deferred_queue); -	skb_queue_head_init(&l_ptr->waiting_sks); - +	skb_queue_head_init(&l_ptr->wakeupq); +	skb_queue_head_init(&l_ptr->inputq); +	skb_queue_head_init(&l_ptr->namedq);  	link_reset_statistics(l_ptr);  	tipc_node_attach_link(n_ptr, l_ptr);  	setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr); @@ -387,7 +386,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,  		return false;  	TIPC_SKB_CB(buf)->chain_sz = chain_sz;  	TIPC_SKB_CB(buf)->chain_imp = imp; -	skb_queue_tail(&link->waiting_sks, buf); +	skb_queue_tail(&link->wakeupq, buf);  	link->stats.link_congs++;  	return true;  } @@ -398,17 +397,19 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,   * Move a number of waiting users, as permitted by available space in   * the send queue, from link wait queue to node wait queue for wakeup   */ -static void link_prepare_wakeup(struct tipc_link *link) +void link_prepare_wakeup(struct tipc_link *link)  {  	uint pend_qsz = skb_queue_len(&link->outqueue);  	struct sk_buff *skb, *tmp; -	skb_queue_walk_safe(&link->waiting_sks, skb, tmp) { +	skb_queue_walk_safe(&link->wakeupq, skb, tmp) {  		if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp])  			break;  		pend_qsz += TIPC_SKB_CB(skb)->chain_sz; -		skb_unlink(skb, &link->waiting_sks); -		skb_queue_tail(&link->owner->waiting_sks, skb); +		skb_unlink(skb, &link->wakeupq); +		skb_queue_tail(&link->inputq, skb); +		link->owner->inputq = &link->inputq; +		link->owner->action_flags |= TIPC_MSG_EVT;  	}  } @@ -461,13 +462,13 @@ void tipc_link_reset(struct tipc_link *l_ptr)  		l_ptr->exp_msg_count = START_CHANGEOVER;  	} -	/* Clean up all queues: */ +	/* Clean up all queues, except inputq: */  	__skb_queue_purge(&l_ptr->outqueue);  	__skb_queue_purge(&l_ptr->deferred_queue); -	if (!skb_queue_empty(&l_ptr->waiting_sks)) { -		skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); -		owner->action_flags |= TIPC_WAKEUP_USERS; -	} +	skb_queue_splice_init(&l_ptr->wakeupq, &l_ptr->inputq); +	if (!skb_queue_empty(&l_ptr->inputq)) +		owner->action_flags |= TIPC_MSG_EVT; +	owner->inputq = &l_ptr->inputq;  	l_ptr->next_out = NULL;  	l_ptr->unacked_window = 0;  	l_ptr->checkpoint = 1; @@ -795,7 +796,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,  static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)  { -	__skb_queue_head_init(list); +	skb_queue_head_init(list);  	__skb_queue_tail(list, skb);  } @@ -841,19 +842,13 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,  			rc = __tipc_link_xmit(net, link, list);  		tipc_node_unlock(node);  	} -  	if (link)  		return rc; -	if (likely(in_own_node(net, dnode))) { -		/* As a node local message chain never contains more than one -		 * buffer, we just need to dequeue one SKB buffer from the -		 * head list. -		 */ -		return tipc_sk_rcv(net, __skb_dequeue(list)); -	} -	__skb_queue_purge(list); +	if (likely(in_own_node(net, dnode))) +		return tipc_sk_rcv(net, list); +	__skb_queue_purge(list);  	return rc;  } @@ -1162,7 +1157,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)  		/* Locate unicast link endpoint that should handle message */  		l_ptr = n_ptr->links[b_ptr->identity];  		if (unlikely(!l_ptr)) -			goto unlock_discard; +			goto unlock;  		/* Verify that communication with node is currently allowed */  		if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) && @@ -1173,7 +1168,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)  			n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;  		if (tipc_node_blocked(n_ptr)) -			goto unlock_discard; +			goto unlock;  		/* Validate message sequence number info */  		seq_no = msg_seqno(msg); @@ -1197,18 +1192,16 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)  		if (unlikely(l_ptr->next_out))  			tipc_link_push_packets(l_ptr); -		if (released && !skb_queue_empty(&l_ptr->waiting_sks)) { +		if (released && !skb_queue_empty(&l_ptr->wakeupq))  			link_prepare_wakeup(l_ptr); -			l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS; -		}  		/* Process the incoming packet */  		if (unlikely(!link_working_working(l_ptr))) {  			if (msg_user(msg) == LINK_PROTOCOL) {  				tipc_link_proto_rcv(l_ptr, skb);  				link_retrieve_defq(l_ptr, &head); -				tipc_node_unlock(n_ptr); -				continue; +				skb = NULL; +				goto unlock;  			}  			/* Traffic message. Conditionally activate link */ @@ -1217,18 +1210,18 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)  			if (link_working_working(l_ptr)) {  				/* Re-insert buffer in front of queue */  				__skb_queue_head(&head, skb); -				tipc_node_unlock(n_ptr); -				continue; +				skb = NULL; +				goto unlock;  			} -			goto unlock_discard; +			goto unlock;  		}  		/* Link is now in state WORKING_WORKING */  		if (unlikely(seq_no != mod(l_ptr->next_in_no))) {  			link_handle_out_of_seq_msg(l_ptr, skb);  			link_retrieve_defq(l_ptr, &head); -			tipc_node_unlock(n_ptr); -			continue; +			skb = NULL; +			goto unlock;  		}  		l_ptr->next_in_no++;  		if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue))) @@ -1238,97 +1231,102 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)  			l_ptr->stats.sent_acks++;  			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);  		} - -		if (tipc_link_prepare_input(net, l_ptr, &skb)) { -			tipc_node_unlock(n_ptr); -			continue; -		} -		tipc_node_unlock(n_ptr); - -		if (tipc_link_input(net, l_ptr, skb) != 0) -			goto discard; -		continue; -unlock_discard: +		tipc_link_input(l_ptr, skb); +		skb = NULL; +unlock:  		tipc_node_unlock(n_ptr);  discard: -		kfree_skb(skb); +		if (unlikely(skb)) +			kfree_skb(skb);  	}  } -/** - * tipc_link_prepare_input - process TIPC link messages - * - * returns nonzero if the message was consumed +/* tipc_data_input - deliver data and name distr msgs to upper layer   * + * Consumes buffer if message is of right type   * Node lock must be held   */ -static int tipc_link_prepare_input(struct net *net, struct tipc_link *l, -				   struct sk_buff **buf) +static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)  { -	struct tipc_node *n; -	struct tipc_msg *msg; -	int res = -EINVAL; +	struct tipc_node *node = link->owner; +	struct tipc_msg *msg = buf_msg(skb); +	u32 dport = msg_destport(msg); -	n = l->owner; -	msg = buf_msg(*buf);  	switch (msg_user(msg)) { -	case CHANGEOVER_PROTOCOL: -		if (tipc_link_tunnel_rcv(n, buf)) -			res = 0; -		break; -	case MSG_FRAGMENTER: -		l->stats.recv_fragments++; -		if (tipc_buf_append(&l->reasm_buf, buf)) { -			l->stats.recv_fragmented++; -			res = 0; -		} else if (!l->reasm_buf) { -			tipc_link_reset(l); +	case TIPC_LOW_IMPORTANCE: +	case TIPC_MEDIUM_IMPORTANCE: +	case TIPC_HIGH_IMPORTANCE: +	case TIPC_CRITICAL_IMPORTANCE: +	case CONN_MANAGER: +		if (tipc_skb_queue_tail(&link->inputq, skb, dport)) { +			node->inputq = &link->inputq; +			node->action_flags |= TIPC_MSG_EVT;  		} -		break; -	case MSG_BUNDLER: -		l->stats.recv_bundles++; -		l->stats.recv_bundled += msg_msgcnt(msg); -		res = 0; -		break; +		return true;  	case NAME_DISTRIBUTOR: -		n->bclink.recv_permitted = true; -		res = 0; -		break; +		node->bclink.recv_permitted = true; +		node->namedq = &link->namedq; +		skb_queue_tail(&link->namedq, skb); +		if (skb_queue_len(&link->namedq) == 1) +			node->action_flags |= TIPC_NAMED_MSG_EVT; +		return true; +	case MSG_BUNDLER: +	case CHANGEOVER_PROTOCOL: +	case MSG_FRAGMENTER:  	case BCAST_PROTOCOL: -		tipc_link_sync_rcv(n, *buf); -		break; +		return false;  	default: -		res = 0; -	} -	return res; +		pr_warn("Dropping received illegal msg type\n"); +		kfree_skb(skb); +		return false; +	};  } -/** - * tipc_link_input - Deliver message too higher layers + +/* tipc_link_input - process packet that has passed link protocol check + * + * Consumes buffer + * Node lock must be held   */ -static int tipc_link_input(struct net *net, struct tipc_link *l, -			   struct sk_buff *buf) +static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)  { -	struct tipc_msg *msg = buf_msg(buf); -	int res = 0; +	struct tipc_node *node = link->owner; +	struct tipc_msg *msg = buf_msg(skb); +	struct sk_buff *iskb; +	int pos = 0; + +	if (likely(tipc_data_input(link, skb))) +		return;  	switch (msg_user(msg)) { -	case TIPC_LOW_IMPORTANCE: -	case TIPC_MEDIUM_IMPORTANCE: -	case TIPC_HIGH_IMPORTANCE: -	case TIPC_CRITICAL_IMPORTANCE: -	case CONN_MANAGER: -		tipc_sk_rcv(net, buf); +	case CHANGEOVER_PROTOCOL: +		if (!tipc_link_tunnel_rcv(node, &skb)) +			break; +		if (msg_user(buf_msg(skb)) != MSG_BUNDLER) { +			tipc_data_input(link, skb); +			break; +		} +	case MSG_BUNDLER: +		link->stats.recv_bundles++; +		link->stats.recv_bundled += msg_msgcnt(msg); + +		while (tipc_msg_extract(skb, &iskb, &pos)) +			tipc_data_input(link, iskb);  		break; -	case NAME_DISTRIBUTOR: -		tipc_named_rcv(net, buf); +	case MSG_FRAGMENTER: +		link->stats.recv_fragments++; +		if (tipc_buf_append(&link->reasm_buf, &skb)) { +			link->stats.recv_fragmented++; +			tipc_data_input(link, skb); +		} else if (!link->reasm_buf) { +			tipc_link_reset(link); +		}  		break; -	case MSG_BUNDLER: -		tipc_link_bundle_rcv(net, buf); +	case BCAST_PROTOCOL: +		tipc_link_sync_rcv(node, skb);  		break;  	default: -		res = -EINVAL; -	} -	return res; +		break; +	};  }  /** @@ -1779,7 +1777,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,   * @from_pos: offset to extract from   *   * Returns a new message buffer containing an embedded message.  The - * encapsulating message itself is left unchanged. + * encapsulating buffer is left unchanged.   */  static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)  { @@ -1793,8 +1791,6 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)  	return eb;  } - -  /* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.   * Owner node is locked.   */ @@ -1893,41 +1889,6 @@ exit:  	return *buf != NULL;  } -/* - *  Bundler functionality: - */ -void tipc_link_bundle_rcv(struct net *net, struct sk_buff *buf) -{ -	u32 msgcount = msg_msgcnt(buf_msg(buf)); -	u32 pos = INT_H_SIZE; -	struct sk_buff *obuf; -	struct tipc_msg *omsg; - -	while (msgcount--) { -		obuf = buf_extract(buf, pos); -		if (obuf == NULL) { -			pr_warn("Link unable to unbundle message(s)\n"); -			break; -		} -		omsg = buf_msg(obuf); -		pos += align(msg_size(omsg)); -		if (msg_isdata(omsg)) { -			if (unlikely(msg_type(omsg) == TIPC_MCAST_MSG)) -				tipc_sk_mcast_rcv(net, obuf); -			else -				tipc_sk_rcv(net, obuf); -		} else if (msg_user(omsg) == CONN_MANAGER) { -			tipc_sk_rcv(net, obuf); -		} else if (msg_user(omsg) == NAME_DISTRIBUTOR) { -			tipc_named_rcv(net, obuf); -		} else { -			pr_warn("Illegal bundled msg: %u\n", msg_user(omsg)); -			kfree_skb(obuf); -		} -	} -	kfree_skb(buf); -} -  static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)  {  	unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4; | 

