From ba2bf2185121db74e075c703fbf986761733dd1d Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Fri, 1 Dec 2006 14:47:20 -0800 Subject: ocfs2_dlm: fix cluster-wide refcounting of lock resources This was previously broken and migration of some locks had to be temporarily disabled. We use a new (and backward-incompatible) set of network messages to account for all references to a lock resources held across the cluster. once these are all freed, the master node may then free the lock resource memory once its local references are dropped. Signed-off-by: Kurt Hackel Signed-off-by: Mark Fasheh --- fs/ocfs2/cluster/tcp_internal.h | 5 +- fs/ocfs2/dlm/dlmcommon.h | 75 +++++++- fs/ocfs2/dlm/dlmdebug.c | 18 ++ fs/ocfs2/dlm/dlmdomain.c | 117 +++++++++--- fs/ocfs2/dlm/dlmlock.c | 4 + fs/ocfs2/dlm/dlmmaster.c | 394 +++++++++++++++++++++++++++++++++++----- fs/ocfs2/dlm/dlmrecovery.c | 123 ++++++++++++- fs/ocfs2/dlm/dlmthread.c | 167 ++++++++--------- 8 files changed, 729 insertions(+), 174 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h index b700dc9624d1..775c911342f4 100644 --- a/fs/ocfs2/cluster/tcp_internal.h +++ b/fs/ocfs2/cluster/tcp_internal.h @@ -38,6 +38,9 @@ * locking semantics of the file system using the protocol. It should * be somewhere else, I'm sure, but right now it isn't. * + * New in version 6: + * - DLM lockres remote refcount fixes. + * * New in version 5: * - Network timeout checking protocol * @@ -51,7 +54,7 @@ * - full 64 bit i_size in the metadata lock lvbs * - introduction of "rw" lock and pushing meta/data locking down */ -#define O2NET_PROTOCOL_VERSION 5ULL +#define O2NET_PROTOCOL_VERSION 6ULL struct o2net_handshake { __be64 protocol_version; __be64 connector_id; diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 6b6ff76538c5..9fa427119a3c 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -222,6 +222,7 @@ static inline void __dlm_set_joining_node(struct dlm_ctxt *dlm, #define DLM_LOCK_RES_DIRTY 0x00000008 #define DLM_LOCK_RES_IN_PROGRESS 0x00000010 #define DLM_LOCK_RES_MIGRATING 0x00000020 +#define DLM_LOCK_RES_DROPPING_REF 0x00000040 /* max milliseconds to wait to sync up a network failure with a node death */ #define DLM_NODE_DEATH_WAIT_MAX (5 * 1000) @@ -265,6 +266,8 @@ struct dlm_lock_resource u8 owner; //node which owns the lock resource, or unknown u16 state; char lvb[DLM_LVB_LEN]; + unsigned int inflight_locks; + unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; }; struct dlm_migratable_lock @@ -367,7 +370,7 @@ enum { DLM_CONVERT_LOCK_MSG, /* 504 */ DLM_PROXY_AST_MSG, /* 505 */ DLM_UNLOCK_LOCK_MSG, /* 506 */ - DLM_UNUSED_MSG2, /* 507 */ + DLM_DEREF_LOCKRES_MSG, /* 507 */ DLM_MIGRATE_REQUEST_MSG, /* 508 */ DLM_MIG_LOCKRES_MSG, /* 509 */ DLM_QUERY_JOIN_MSG, /* 510 */ @@ -417,6 +420,9 @@ struct dlm_master_request u8 name[O2NM_MAX_NAME_LEN]; }; +#define DLM_ASSERT_RESPONSE_REASSERT 0x00000001 +#define DLM_ASSERT_RESPONSE_MASTERY_REF 0x00000002 + #define DLM_ASSERT_MASTER_MLE_CLEANUP 0x00000001 #define DLM_ASSERT_MASTER_REQUERY 0x00000002 #define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004 @@ -430,6 +436,8 @@ struct dlm_assert_master u8 name[O2NM_MAX_NAME_LEN]; }; +#define DLM_MIGRATE_RESPONSE_MASTERY_REF 0x00000001 + struct dlm_migrate_request { u8 master; @@ -648,6 +656,16 @@ struct dlm_finalize_reco __be32 pad2; }; +struct dlm_deref_lockres +{ + u32 pad1; + u16 pad2; + u8 node_idx; + u8 namelen; + + u8 name[O2NM_MAX_NAME_LEN]; +}; + static inline enum dlm_status __dlm_lockres_state_to_status(struct dlm_lock_resource *res) { @@ -721,8 +739,8 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); -void dlm_purge_lockres(struct dlm_ctxt *dlm, - struct dlm_lock_resource *lockres); +int dlm_purge_lockres(struct dlm_ctxt *dlm, + struct dlm_lock_resource *lockres); static inline void dlm_lockres_get(struct dlm_lock_resource *res) { /* This is called on every lookup, so it might be worth @@ -733,6 +751,10 @@ void dlm_lockres_put(struct dlm_lock_resource *res); void __dlm_unhash_lockres(struct dlm_lock_resource *res); void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); +struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, + const char *name, + unsigned int len, + unsigned int hash); struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, const char *name, unsigned int len, @@ -753,6 +775,47 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, const char *name, unsigned int namelen); +#define dlm_lockres_set_refmap_bit(bit,res) \ + __dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__) +#define dlm_lockres_clear_refmap_bit(bit,res) \ + __dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__) + +static inline void __dlm_lockres_set_refmap_bit(int bit, + struct dlm_lock_resource *res, + const char *file, + int line) +{ + //printk("%s:%d:%.*s: setting bit %d\n", file, line, + // res->lockname.len, res->lockname.name, bit); + set_bit(bit, res->refmap); +} + +static inline void __dlm_lockres_clear_refmap_bit(int bit, + struct dlm_lock_resource *res, + const char *file, + int line) +{ + //printk("%s:%d:%.*s: clearing bit %d\n", file, line, + // res->lockname.len, res->lockname.name, bit); + clear_bit(bit, res->refmap); +} + +void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + const char *file, + int line); +void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + int new_lockres, + const char *file, + int line); +#define dlm_lockres_drop_inflight_ref(d,r) \ + __dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__) +#define dlm_lockres_grab_inflight_ref(d,r) \ + __dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__) +#define dlm_lockres_grab_inflight_ref_new(d,r) \ + __dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__) + void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock); void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock); void dlm_do_local_ast(struct dlm_ctxt *dlm, @@ -805,6 +868,7 @@ int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 target); +int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 old_master); @@ -814,6 +878,7 @@ void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res); int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data); int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data); +int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data); int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data); int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data); int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data); @@ -856,10 +921,12 @@ static inline void __dlm_wait_on_lockres(struct dlm_lock_resource *res) int dlm_init_mle_cache(void); void dlm_destroy_mle_cache(void); void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up); +int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res); void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node); int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock); - +int __dlm_lockres_has_locks(struct dlm_lock_resource *res); int __dlm_lockres_unused(struct dlm_lock_resource *res); static inline const char * dlm_lock_mode_name(int mode) diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c index 3f6c8d88f7af..1015cc7bf9cb 100644 --- a/fs/ocfs2/dlm/dlmdebug.c +++ b/fs/ocfs2/dlm/dlmdebug.c @@ -53,6 +53,23 @@ void dlm_print_one_lock_resource(struct dlm_lock_resource *res) spin_unlock(&res->spinlock); } +static void dlm_print_lockres_refmap(struct dlm_lock_resource *res) +{ + int bit; + assert_spin_locked(&res->spinlock); + + mlog(ML_NOTICE, " refmap nodes: [ "); + bit = 0; + while (1) { + bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit); + if (bit >= O2NM_MAX_NODES) + break; + printk("%u ", bit); + bit++; + } + printk("], inflight=%u\n", res->inflight_locks); +} + void __dlm_print_one_lock_resource(struct dlm_lock_resource *res) { struct list_head *iter2; @@ -65,6 +82,7 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res) res->owner, res->state); mlog(ML_NOTICE, " last used: %lu, on purge list: %s\n", res->last_used, list_empty(&res->purge) ? "no" : "yes"); + dlm_print_lockres_refmap(res); mlog(ML_NOTICE, " granted queue: \n"); list_for_each(iter2, &res->granted) { lock = list_entry(iter2, struct dlm_lock, list); diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index f0b25f2dd205..3995de360264 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -125,10 +125,10 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm, hlist_add_head(&res->hash_node, bucket); } -struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, - const char *name, - unsigned int len, - unsigned int hash) +struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, + const char *name, + unsigned int len, + unsigned int hash) { struct hlist_head *bucket; struct hlist_node *list; @@ -154,6 +154,37 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, return NULL; } +/* intended to be called by functions which do not care about lock + * resources which are being purged (most net _handler functions). + * this will return NULL for any lock resource which is found but + * currently in the process of dropping its mastery reference. + * use __dlm_lookup_lockres_full when you need the lock resource + * regardless (e.g. dlm_get_lock_resource) */ +struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, + const char *name, + unsigned int len, + unsigned int hash) +{ + struct dlm_lock_resource *res = NULL; + + mlog_entry("%.*s\n", len, name); + + assert_spin_locked(&dlm->spinlock); + + res = __dlm_lookup_lockres_full(dlm, name, len, hash); + if (res) { + spin_lock(&res->spinlock); + if (res->state & DLM_LOCK_RES_DROPPING_REF) { + spin_unlock(&res->spinlock); + dlm_lockres_put(res); + return NULL; + } + spin_unlock(&res->spinlock); + } + + return res; +} + struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, const char *name, unsigned int len) @@ -330,43 +361,60 @@ static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm) wake_up(&dlm_domain_events); } -static void dlm_migrate_all_locks(struct dlm_ctxt *dlm) +static int dlm_migrate_all_locks(struct dlm_ctxt *dlm) { - int i; + int i, num, n, ret = 0; struct dlm_lock_resource *res; + struct hlist_node *iter; + struct hlist_head *bucket; + int dropped; mlog(0, "Migrating locks from domain %s\n", dlm->name); -restart: + + num = 0; spin_lock(&dlm->spinlock); for (i = 0; i < DLM_HASH_BUCKETS; i++) { - while (!hlist_empty(dlm_lockres_hash(dlm, i))) { - res = hlist_entry(dlm_lockres_hash(dlm, i)->first, - struct dlm_lock_resource, hash_node); - /* need reference when manually grabbing lockres */ +redo_bucket: + n = 0; + bucket = dlm_lockres_hash(dlm, i); + iter = bucket->first; + while (iter) { + n++; + res = hlist_entry(iter, struct dlm_lock_resource, + hash_node); dlm_lockres_get(res); - /* this should unhash the lockres - * and exit with dlm->spinlock */ - mlog(0, "purging res=%p\n", res); - if (dlm_lockres_is_dirty(dlm, res)) { - /* HACK! this should absolutely go. - * need to figure out why some empty - * lockreses are still marked dirty */ - mlog(ML_ERROR, "lockres %.*s dirty!\n", - res->lockname.len, res->lockname.name); - - spin_unlock(&dlm->spinlock); - dlm_kick_thread(dlm, res); - wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res)); - dlm_lockres_put(res); - goto restart; - } - dlm_purge_lockres(dlm, res); + /* migrate, if necessary. this will drop the dlm + * spinlock and retake it if it does migration. */ + dropped = dlm_empty_lockres(dlm, res); + + spin_lock(&res->spinlock); + __dlm_lockres_calc_usage(dlm, res); + iter = res->hash_node.next; + spin_unlock(&res->spinlock); + dlm_lockres_put(res); + + cond_resched_lock(&dlm->spinlock); + + if (dropped) + goto redo_bucket; } + num += n; + mlog(0, "%s: touched %d lockreses in bucket %d " + "(tot=%d)\n", dlm->name, n, i, num); } spin_unlock(&dlm->spinlock); - + wake_up(&dlm->dlm_thread_wq); + + /* let the dlm thread take care of purging, keep scanning until + * nothing remains in the hash */ + if (num) { + mlog(0, "%s: %d lock resources in hash last pass\n", + dlm->name, num); + ret = -EAGAIN; + } mlog(0, "DONE Migrating locks from domain %s\n", dlm->name); + return ret; } static int dlm_no_joining_node(struct dlm_ctxt *dlm) @@ -571,7 +619,9 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm) /* We changed dlm state, notify the thread */ dlm_kick_thread(dlm, NULL); - dlm_migrate_all_locks(dlm); + while (dlm_migrate_all_locks(dlm)) { + mlog(0, "%s: more migration to do\n", dlm->name); + } dlm_mark_domain_leaving(dlm); dlm_leave_domain(dlm); dlm_complete_dlm_shutdown(dlm); @@ -1082,6 +1132,13 @@ static int dlm_register_domain_handlers(struct dlm_ctxt *dlm) if (status) goto bail; + status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key, + sizeof(struct dlm_deref_lockres), + dlm_deref_lockres_handler, + dlm, &dlm->dlm_domain_handlers); + if (status) + goto bail; + status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key, sizeof(struct dlm_migrate_request), dlm_migrate_request_handler, diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c index e5ca3db197f6..ac91a76b1e78 100644 --- a/fs/ocfs2/dlm/dlmlock.c +++ b/fs/ocfs2/dlm/dlmlock.c @@ -163,6 +163,10 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm, kick_thread = 1; } } + /* reduce the inflight count, this may result in the lockres + * being purged below during calc_usage */ + if (lock->ml.node == dlm->node_num) + dlm_lockres_drop_inflight_ref(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 0ad872055cb3..4645ec2e0fc3 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -99,9 +99,9 @@ static void dlm_mle_node_up(struct dlm_ctxt *dlm, int idx); static void dlm_assert_master_worker(struct dlm_work_item *item, void *data); -static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, - unsigned int namelen, void *nodemap, - u32 flags); +static int dlm_do_assert_master(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + void *nodemap, u32 flags); static inline int dlm_mle_equal(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle, @@ -237,7 +237,8 @@ static int dlm_find_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry **mle, char *name, unsigned int namelen); -static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to); +static int dlm_do_master_request(struct dlm_lock_resource *res, + struct dlm_master_list_entry *mle, int to); static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm, @@ -687,6 +688,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm, INIT_LIST_HEAD(&res->purge); atomic_set(&res->asts_reserved, 0); res->migration_pending = 0; + res->inflight_locks = 0; kref_init(&res->refs); @@ -700,6 +702,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm, res->last_used = 0; memset(res->lvb, 0, DLM_LVB_LEN); + memset(res->refmap, 0, sizeof(res->refmap)); } struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, @@ -722,6 +725,42 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, return res; } +void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + int new_lockres, + const char *file, + int line) +{ + if (!new_lockres) + assert_spin_locked(&res->spinlock); + + if (!test_bit(dlm->node_num, res->refmap)) { + BUG_ON(res->inflight_locks != 0); + dlm_lockres_set_refmap_bit(dlm->node_num, res); + } + res->inflight_locks++; + mlog(0, "%s:%.*s: inflight++: now %u\n", + dlm->name, res->lockname.len, res->lockname.name, + res->inflight_locks); +} + +void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + const char *file, + int line) +{ + assert_spin_locked(&res->spinlock); + + BUG_ON(res->inflight_locks == 0); + res->inflight_locks--; + mlog(0, "%s:%.*s: inflight--: now %u\n", + dlm->name, res->lockname.len, res->lockname.name, + res->inflight_locks); + if (res->inflight_locks == 0) + dlm_lockres_clear_refmap_bit(dlm->node_num, res); + wake_up(&res->wq); +} + /* * lookup a lock resource by name. * may already exist in the hashtable. @@ -752,6 +791,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, unsigned int hash; int tries = 0; int bit, wait_on_recovery = 0; + int drop_inflight_if_nonlocal = 0; BUG_ON(!lockid); @@ -761,9 +801,30 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, lookup: spin_lock(&dlm->spinlock); - tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash); + tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash); if (tmpres) { + int dropping_ref = 0; + + spin_lock(&tmpres->spinlock); + if (tmpres->owner == dlm->node_num) { + BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF); + dlm_lockres_grab_inflight_ref(dlm, tmpres); + } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) + dropping_ref = 1; + spin_unlock(&tmpres->spinlock); spin_unlock(&dlm->spinlock); + + /* wait until done messaging the master, drop our ref to allow + * the lockres to be purged, start over. */ + if (dropping_ref) { + spin_lock(&tmpres->spinlock); + __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF); + spin_unlock(&tmpres->spinlock); + dlm_lockres_put(tmpres); + tmpres = NULL; + goto lookup; + } + mlog(0, "found in hash!\n"); if (res) dlm_lockres_put(res); @@ -793,6 +854,7 @@ lookup: spin_lock(&res->spinlock); dlm_change_lockres_owner(dlm, res, dlm->node_num); __dlm_insert_lockres(dlm, res); + dlm_lockres_grab_inflight_ref(dlm, res); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); /* lockres still marked IN_PROGRESS */ @@ -805,29 +867,40 @@ lookup: /* if we found a block, wait for lock to be mastered by another node */ blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen); if (blocked) { + int mig; if (mle->type == DLM_MLE_MASTER) { mlog(ML_ERROR, "master entry for nonexistent lock!\n"); BUG(); - } else if (mle->type == DLM_MLE_MIGRATION) { - /* migration is in progress! */ - /* the good news is that we now know the - * "current" master (mle->master). */ - + } + mig = (mle->type == DLM_MLE_MIGRATION); + /* if there is a migration in progress, let the migration + * finish before continuing. we can wait for the absence + * of the MIGRATION mle: either the migrate finished or + * one of the nodes died and the mle was cleaned up. + * if there is a BLOCK here, but it already has a master + * set, we are too late. the master does not have a ref + * for us in the refmap. detach the mle and drop it. + * either way, go back to the top and start over. */ + if (mig || mle->master != O2NM_MAX_NODES) { + BUG_ON(mig && mle->master == dlm->node_num); + /* we arrived too late. the master does not + * have a ref for us. retry. */ + mlog(0, "%s:%.*s: late on %s\n", + dlm->name, namelen, lockid, + mig ? "MIGRATION" : "BLOCK"); spin_unlock(&dlm->master_lock); - assert_spin_locked(&dlm->spinlock); - - /* set the lockres owner and hash it */ - spin_lock(&res->spinlock); - dlm_set_lockres_owner(dlm, res, mle->master); - __dlm_insert_lockres(dlm, res); - spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); /* master is known, detach */ - dlm_mle_detach_hb_events(dlm, mle); + if (!mig) + dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); mle = NULL; - goto wake_waiters; + /* this is lame, but we cant wait on either + * the mle or lockres waitqueue here */ + if (mig) + msleep(100); + goto lookup; } } else { /* go ahead and try to master lock on this node */ @@ -858,6 +931,13 @@ lookup: /* finally add the lockres to its hash bucket */ __dlm_insert_lockres(dlm, res); + /* since this lockres is new it doesnt not require the spinlock */ + dlm_lockres_grab_inflight_ref_new(dlm, res); + + /* if this node does not become the master make sure to drop + * this inflight reference below */ + drop_inflight_if_nonlocal = 1; + /* get an extra ref on the mle in case this is a BLOCK * if so, the creator of the BLOCK may try to put the last * ref at this time in the assert master handler, so we @@ -910,7 +990,7 @@ redo_request: ret = -EINVAL; dlm_node_iter_init(mle->vote_map, &iter); while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { - ret = dlm_do_master_request(mle, nodenum); + ret = dlm_do_master_request(res, mle, nodenum); if (ret < 0) mlog_errno(ret); if (mle->master != O2NM_MAX_NODES) { @@ -960,6 +1040,8 @@ wait: wake_waiters: spin_lock(&res->spinlock); + if (res->owner != dlm->node_num && drop_inflight_if_nonlocal) + dlm_lockres_drop_inflight_ref(dlm, res); res->state &= ~DLM_LOCK_RES_IN_PROGRESS; spin_unlock(&res->spinlock); wake_up(&res->wq); @@ -998,7 +1080,7 @@ recheck: /* this will cause the master to re-assert across * the whole cluster, freeing up mles */ if (res->owner != dlm->node_num) { - ret = dlm_do_master_request(mle, res->owner); + ret = dlm_do_master_request(res, mle, res->owner); if (ret < 0) { /* give recovery a chance to run */ mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); @@ -1062,6 +1144,8 @@ recheck: * now tell other nodes that I am * mastering this. */ mle->master = dlm->node_num; + /* ref was grabbed in get_lock_resource + * will be dropped in dlmlock_master */ assert = 1; sleep = 0; } @@ -1087,7 +1171,8 @@ recheck: (atomic_read(&mle->woken) == 1), timeo); if (res->owner == O2NM_MAX_NODES) { - mlog(0, "waiting again\n"); + mlog(0, "%s:%.*s: waiting again\n", dlm->name, + res->lockname.len, res->lockname.name); goto recheck; } mlog(0, "done waiting, master is %u\n", res->owner); @@ -1100,8 +1185,7 @@ recheck: m = dlm->node_num; mlog(0, "about to master %.*s here, this=%u\n", res->lockname.len, res->lockname.name, m); - ret = dlm_do_assert_master(dlm, res->lockname.name, - res->lockname.len, mle->vote_map, 0); + ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0); if (ret) { /* This is a failure in the network path, * not in the response to the assert_master @@ -1117,6 +1201,8 @@ recheck: /* set the lockres owner */ spin_lock(&res->spinlock); + /* mastery reference obtained either during + * assert_master_handler or in get_lock_resource */ dlm_change_lockres_owner(dlm, res, m); spin_unlock(&res->spinlock); @@ -1283,7 +1369,8 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, * */ -static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to) +static int dlm_do_master_request(struct dlm_lock_resource *res, + struct dlm_master_list_entry *mle, int to) { struct dlm_ctxt *dlm = mle->dlm; struct dlm_master_request request; @@ -1339,6 +1426,9 @@ again: case DLM_MASTER_RESP_YES: set_bit(to, mle->response_map); mlog(0, "node %u is the master, response=YES\n", to); + mlog(0, "%s:%.*s: master node %u now knows I have a " + "reference\n", dlm->name, res->lockname.len, + res->lockname.name, to); mle->master = to; break; case DLM_MASTER_RESP_NO: @@ -1428,8 +1518,10 @@ way_up_top: } if (res->owner == dlm->node_num) { + mlog(0, "%s:%.*s: setting bit %u in refmap\n", + dlm->name, namelen, name, request->node_idx); + dlm_lockres_set_refmap_bit(request->node_idx, res); spin_unlock(&res->spinlock); - // mlog(0, "this node is the master\n"); response = DLM_MASTER_RESP_YES; if (mle) kmem_cache_free(dlm_mle_cache, mle); @@ -1477,7 +1569,6 @@ way_up_top: mlog(0, "node %u is master, but trying to migrate to " "node %u.\n", tmpmle->master, tmpmle->new_master); if (tmpmle->master == dlm->node_num) { - response = DLM_MASTER_RESP_YES; mlog(ML_ERROR, "no owner on lockres, but this " "node is trying to migrate it to %u?!\n", tmpmle->new_master); @@ -1494,6 +1585,10 @@ way_up_top: * go back and clean the mles on any * other nodes */ dispatch_assert = 1; + dlm_lockres_set_refmap_bit(request->node_idx, res); + mlog(0, "%s:%.*s: setting bit %u in refmap\n", + dlm->name, namelen, name, + request->node_idx); } else response = DLM_MASTER_RESP_NO; } else { @@ -1607,15 +1702,17 @@ send_response: * can periodically run all locks owned by this node * and re-assert across the cluster... */ -static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, - unsigned int namelen, void *nodemap, - u32 flags) +int dlm_do_assert_master(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + void *nodemap, u32 flags) { struct dlm_assert_master assert; int to, tmpret; struct dlm_node_iter iter; int ret = 0; int reassert; + const char *lockname = res->lockname.name; + unsigned int namelen = res->lockname.len; BUG_ON(namelen > O2NM_MAX_NAME_LEN); again: @@ -1647,6 +1744,7 @@ again: mlog(0, "link to %d went down!\n", to); /* any nonzero status return will do */ ret = tmpret; + r = 0; } else if (r < 0) { /* ok, something horribly messed. kill thyself. */ mlog(ML_ERROR,"during assert master of %.*s to %u, " @@ -1661,12 +1759,29 @@ again: spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); BUG(); - } else if (r == EAGAIN) { + } + + if (r & DLM_ASSERT_RESPONSE_REASSERT && + !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) { + mlog(ML_ERROR, "%.*s: very strange, " + "master MLE but no lockres on %u\n", + namelen, lockname, to); + } + + if (r & DLM_ASSERT_RESPONSE_REASSERT) { mlog(0, "%.*s: node %u create mles on other " "nodes and requests a re-assert\n", namelen, lockname, to); reassert = 1; } + if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) { + mlog(0, "%.*s: node %u has a reference to this " + "lockres, set the bit in the refmap\n", + namelen, lockname, to); + spin_lock(&res->spinlock); + dlm_lockres_set_refmap_bit(to, res); + spin_unlock(&res->spinlock); + } } if (reassert) @@ -1693,7 +1808,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) char *name; unsigned int namelen, hash; u32 flags; - int master_request = 0; + int master_request = 0, have_lockres_ref = 0; int ret = 0; if (!dlm_grab(dlm)) @@ -1864,6 +1979,7 @@ ok: dlm_change_lockres_owner(dlm, res, mle->master); } spin_unlock(&res->spinlock); + have_lockres_ref = 1; } /* master is known, detach if not already detached. @@ -1918,7 +2034,19 @@ done: dlm_put(dlm); if (master_request) { mlog(0, "need to tell master to reassert\n"); - ret = EAGAIN; // positive. negative would shoot down the node. + /* positive. negative would shoot down the node. */ + ret |= DLM_ASSERT_RESPONSE_REASSERT; + if (!have_lockres_ref) { + mlog(ML_ERROR, "strange, got assert from %u, MASTER " + "mle present here for %s:%.*s, but no lockres!\n", + assert->node_idx, dlm->name, namelen, name); + } + } + if (have_lockres_ref) { + /* let the master know we have a reference to the lockres */ + ret |= DLM_ASSERT_RESPONSE_MASTERY_REF; + mlog(0, "%s:%.*s: got assert from %u, need a ref\n", + dlm->name, namelen, name, assert->node_idx); } return ret; @@ -2023,9 +2151,7 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data) * even if one or more nodes die */ mlog(0, "worker about to master %.*s here, this=%u\n", res->lockname.len, res->lockname.name, dlm->node_num); - ret = dlm_do_assert_master(dlm, res->lockname.name, - res->lockname.len, - nodemap, flags); + ret = dlm_do_assert_master(dlm, res, nodemap, flags); if (ret < 0) { /* no need to restart, we are done */ if (!dlm_is_host_down(ret)) @@ -2097,6 +2223,104 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm, return ret; } +/* + * DLM_DEREF_LOCKRES_MSG + */ + +int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) +{ + struct dlm_deref_lockres deref; + int ret = 0, r; + const char *lockname; + unsigned int namelen; + + lockname = res->lockname.name; + namelen = res->lockname.len; + BUG_ON(namelen > O2NM_MAX_NAME_LEN); + + mlog(0, "%s:%.*s: sending deref to %d\n", + dlm->name, namelen, lockname, res->owner); + memset(&deref, 0, sizeof(deref)); + deref.node_idx = dlm->node_num; + deref.namelen = namelen; + memcpy(deref.name, lockname, namelen); + + ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key, + &deref, sizeof(deref), res->owner, &r); + if (ret < 0) + mlog_errno(ret); + else if (r < 0) { + /* BAD. other node says I did not have a ref. */ + mlog(ML_ERROR,"while dropping ref on %s:%.*s " + "(master=%u) got %d.\n", dlm->name, namelen, + lockname, res->owner, r); + dlm_print_one_lock_resource(res); + BUG(); + } + return ret; +} + +int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data) +{ + struct dlm_ctxt *dlm = data; + struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf; + struct dlm_lock_resource *res = NULL; + char *name; + unsigned int namelen; + int ret = -EINVAL; + u8 node; + unsigned int hash; + + if (!dlm_grab(dlm)) + return 0; + + name = deref->name; + namelen = deref->namelen; + node = deref->node_idx; + + if (namelen > DLM_LOCKID_NAME_MAX) { + mlog(ML_ERROR, "Invalid name length!"); + goto done; + } + if (deref->node_idx >= O2NM_MAX_NODES) { + mlog(ML_ERROR, "Invalid node number: %u\n", node); + goto done; + } + + hash = dlm_lockid_hash(name, namelen); + + spin_lock(&dlm->spinlock); + res = __dlm_lookup_lockres_full(dlm, name, namelen, hash); + if (!res) { + spin_unlock(&dlm->spinlock); + mlog(ML_ERROR, "%s:%.*s: bad lockres name\n", + dlm->name, namelen, name); + goto done; + } + spin_unlock(&dlm->spinlock); + + spin_lock(&res->spinlock); + BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); + if (test_bit(node, res->refmap)) { + ret = 0; + dlm_lockres_clear_refmap_bit(node, res); + } else { + mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref " + "but it is already dropped!\n", dlm->name, namelen, + name, node); + __dlm_print_one_lock_resource(res); + } + spin_unlock(&res->spinlock); + + if (!ret) + dlm_lockres_calc_usage(dlm, res); +done: + if (res) + dlm_lockres_put(res); + dlm_put(dlm); + return ret; +} + /* * DLM_MIGRATE_LOCKRES @@ -2376,6 +2600,53 @@ leave: return ret; } +#define DLM_MIGRATION_RETRY_MS 100 + +/* Should be called only after beginning the domain leave process. + * There should not be any remaining locks on nonlocal lock resources, + * and there should be no local locks left on locally mastered resources. + * + * Called with the dlm spinlock held, may drop it to do migration, but + * will re-acquire before exit. + * + * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */ +int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) +{ + int ret; + int lock_dropped = 0; + + if (res->owner != dlm->node_num) { + if (!__dlm_lockres_unused(res)) { + mlog(ML_ERROR, "%s:%.*s: this node is not master, " + "trying to free this but locks remain\n", + dlm->name, res->lockname.len, res->lockname.name); + } + goto leave; + } + + /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */ + spin_unlock(&dlm->spinlock); + lock_dropped = 1; + while (1) { + ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES); + if (ret >= 0) + break; + if (ret == -ENOTEMPTY) { + mlog(ML_ERROR, "lockres %.*s still has local locks!\n", + res->lockname.len, res->lockname.name); + BUG(); + } + + mlog(0, "lockres %.*s: migrate failed, " + "retrying\n", res->lockname.len, + res->lockname.name); + msleep(DLM_MIGRATION_RETRY_MS); + } + spin_lock(&dlm->spinlock); +leave: + return lock_dropped; +} + int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock) { int ret; @@ -2490,7 +2761,7 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm, { struct list_head *iter, *iter2; struct list_head *queue = &res->granted; - int i; + int i, bit; struct dlm_lock *lock; assert_spin_locked(&res->spinlock); @@ -2508,12 +2779,28 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm, BUG_ON(!list_empty(&lock->bast_list)); BUG_ON(lock->ast_pending); BUG_ON(lock->bast_pending); + dlm_lockres_clear_refmap_bit(lock->ml.node, res); list_del_init(&lock->list); dlm_lock_put(lock); } } queue++; } + bit = 0; + while (1) { + bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit); + if (bit >= O2NM_MAX_NODES) + break; + /* do not clear the local node reference, if there is a + * process holding this, let it drop the ref itself */ + if (bit != dlm->node_num) { + mlog(0, "%s:%.*s: node %u had a ref to this " + "migrating lockres, clearing\n", dlm->name, + res->lockname.len, res->lockname.name, bit); + dlm_lockres_clear_refmap_bit(bit, res); + } + bit++; + } } /* for now this is not too intelligent. we will @@ -2601,6 +2888,16 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm, mlog(0, "migrate request (node %u) returned %d!\n", nodenum, status); ret = status; + } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) { + /* during the migration request we short-circuited + * the mastery of the lockres. make sure we have + * a mastery ref for nodenum */ + mlog(0, "%s:%.*s: need ref for node %u\n", + dlm->name, res->lockname.len, res->lockname.name, + nodenum); + spin_lock(&res->spinlock); + dlm_lockres_set_refmap_bit(nodenum, res); + spin_unlock(&res->spinlock); } } @@ -2745,7 +3042,13 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, /* remove it from the list so that only one * mle will be found */ list_del_init(&tmp->list); - __dlm_mle_detach_hb_events(dlm, mle); + /* this was obviously WRONG. mle is uninited here. should be tmp. */ + __dlm_mle_detach_hb_events(dlm, tmp); + ret = DLM_MIGRATE_RESPONSE_MASTERY_REF; + mlog(0, "%s:%.*s: master=%u, newmaster=%u, " + "telling master to get ref for cleared out mle " + "during migration\n", dlm->name, namelen, name, + master, new_master); } spin_unlock(&tmp->spinlock); } @@ -2753,6 +3056,8 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, /* now add a migration mle to the tail of the list */ dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen); mle->new_master = new_master; + /* the new master will be sending an assert master for this. + * at that point we will get the refmap reference */ mle->master = master; /* do this for consistency with other mle types */ set_bit(new_master, mle->maybe_map); @@ -2902,6 +3207,13 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, clear_bit(dlm->node_num, iter.node_map); spin_unlock(&dlm->spinlock); + /* ownership of the lockres is changing. account for the + * mastery reference here since old_master will briefly have + * a reference after the migration completes */ + spin_lock(&res->spinlock); + dlm_lockres_set_refmap_bit(old_master, res); + spin_unlock(&res->spinlock); + mlog(0, "now time to do a migrate request to other nodes\n"); ret = dlm_do_migrate_request(dlm, res, old_master, dlm->node_num, &iter); @@ -2914,8 +3226,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, res->lockname.len, res->lockname.name); /* this call now finishes out the nodemap * even if one or more nodes die */ - ret = dlm_do_assert_master(dlm, res->lockname.name, - res->lockname.len, iter.node_map, + ret = dlm_do_assert_master(dlm, res, iter.node_map, DLM_ASSERT_MASTER_FINISH_MIGRATION); if (ret < 0) { /* no longer need to retry. all living nodes contacted. */ @@ -2927,8 +3238,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, set_bit(old_master, iter.node_map); mlog(0, "doing assert master of %.*s back to %u\n", res->lockname.len, res->lockname.name, old_master); - ret = dlm_do_assert_master(dlm, res->lockname.name, - res->lockname.len, iter.node_map, + ret = dlm_do_assert_master(dlm, res, iter.node_map, DLM_ASSERT_MASTER_FINISH_MIGRATION); if (ret < 0) { mlog(0, "assert master to original master failed " diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 367a11e9e2ed..d011a2a22742 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1129,6 +1129,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm, if (total_locks == mres_total_locks) mres->flags |= DLM_MRES_ALL_DONE; + mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n", + dlm->name, res->lockname.len, res->lockname.name, + orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery", + send_to); + /* send it */ ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres, sz, send_to, &status); @@ -1213,6 +1218,34 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock, return 0; } +static void dlm_add_dummy_lock(struct dlm_ctxt *dlm, + struct dlm_migratable_lockres *mres) +{ + struct dlm_lock dummy; + memset(&dummy, 0, sizeof(dummy)); + dummy.ml.cookie = 0; + dummy.ml.type = LKM_IVMODE; + dummy.ml.convert_type = LKM_IVMODE; + dummy.ml.highest_blocked = LKM_IVMODE; + dummy.lksb = NULL; + dummy.ml.node = dlm->node_num; + dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST); +} + +static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm, + struct dlm_migratable_lock *ml, + u8 *nodenum) +{ + if (unlikely(ml->cookie == 0 && + ml->type == LKM_IVMODE && + ml->convert_type == LKM_IVMODE && + ml->highest_blocked == LKM_IVMODE && + ml->list == DLM_BLOCKED_LIST)) { + *nodenum = ml->node; + return 1; + } + return 0; +} int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_migratable_lockres *mres, @@ -1260,6 +1293,14 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, goto error; } } + if (total_locks == 0) { + /* send a dummy lock to indicate a mastery reference only */ + mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n", + dlm->name, res->lockname.len, res->lockname.name, + send_to, flags & DLM_MRES_RECOVERY ? "recovery" : + "migration"); + dlm_add_dummy_lock(dlm, mres); + } /* flush any remaining locks */ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); if (ret < 0) @@ -1386,13 +1427,16 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data) /* add an extra ref for just-allocated lockres * otherwise the lockres will be purged immediately */ dlm_lockres_get(res); - } /* at this point we have allocated everything we need, * and we have a hashed lockres with an extra ref and * the proper res->state flags. */ ret = 0; + spin_lock(&res->spinlock); + /* drop this either when master requery finds a different master + * or when a lock is added by the recovery worker */ + dlm_lockres_grab_inflight_ref(dlm, res); if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) { /* migration cannot have an unknown master */ BUG_ON(!(mres->flags & DLM_MRES_RECOVERY)); @@ -1400,10 +1444,11 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data) "unknown owner.. will need to requery: " "%.*s\n", mres->lockname_len, mres->lockname); } else { - spin_lock(&res->spinlock); + /* take a reference now to pin the lockres, drop it + * when locks are added in the worker */ dlm_change_lockres_owner(dlm, res, dlm->node_num); - spin_unlock(&res->spinlock); } + spin_unlock(&res->spinlock); /* queue up work for dlm_mig_lockres_worker */ dlm_grab(dlm); /* get an extra ref for the work item */ @@ -1459,6 +1504,9 @@ again: "this node will take it.\n", res->lockname.len, res->lockname.name); } else { + spin_lock(&res->spinlock); + dlm_lockres_drop_inflight_ref(dlm, res); + spin_unlock(&res->spinlock); mlog(0, "master needs to respond to sender " "that node %u still owns %.*s\n", real_master, res->lockname.len, @@ -1666,10 +1714,25 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, int i, bad; struct list_head *iter; struct dlm_lock *lock = NULL; + u8 from = O2NM_MAX_NODES; + unsigned int added = 0; mlog(0, "running %d locks for this lockres\n", mres->num_locks); for (i=0; inum_locks; i++) { ml = &(mres->ml[i]); + + if (dlm_is_dummy_lock(dlm, ml, &from)) { + /* placeholder, just need to set the refmap bit */ + BUG_ON(mres->num_locks != 1); + mlog(0, "%s:%.*s: dummy lock for %u\n", + dlm->name, mres->lockname_len, mres->lockname, + from); + spin_lock(&res->spinlock); + dlm_lockres_set_refmap_bit(from, res); + spin_unlock(&res->spinlock); + added++; + break; + } BUG_ON(ml->highest_blocked != LKM_IVMODE); newlock = NULL; lksb = NULL; @@ -1711,6 +1774,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, /* do not alter lock refcount. switching lists. */ list_move_tail(&lock->list, queue); spin_unlock(&res->spinlock); + added++; mlog(0, "just reordered a local lock!\n"); continue; @@ -1817,12 +1881,24 @@ skip_lvb: if (!bad) { dlm_lock_get(newlock); list_add_tail(&newlock->list, queue); + mlog(0, "%s:%.*s: added lock for node %u, " + "setting refmap bit\n", dlm->name, + res->lockname.len, res->lockname.name, ml->node); + dlm_lockres_set_refmap_bit(ml->node, res); + added++; } spin_unlock(&res->spinlock); } mlog(0, "done running all the locks\n"); leave: + /* balance the ref taken when the work was queued */ + if (added > 0) { + spin_lock(&res->spinlock); + dlm_lockres_drop_inflight_ref(dlm, res); + spin_unlock(&res->spinlock); + } + if (ret < 0) { mlog_errno(ret); if (newlock) @@ -1935,9 +2011,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, if (res->owner == dead_node) { list_del_init(&res->recovering); spin_lock(&res->spinlock); + /* new_master has our reference from + * the lock state sent during recovery */ dlm_change_lockres_owner(dlm, res, new_master); res->state &= ~DLM_LOCK_RES_RECOVERING; - if (!__dlm_lockres_unused(res)) + if (__dlm_lockres_has_locks(res)) __dlm_dirty_lockres(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); @@ -1977,9 +2055,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, dlm_lockres_put(res); } spin_lock(&res->spinlock); + /* new_master has our reference from + * the lock state sent during recovery */ dlm_change_lockres_owner(dlm, res, new_master); res->state &= ~DLM_LOCK_RES_RECOVERING; - if (!__dlm_lockres_unused(res)) + if (__dlm_lockres_has_locks(res)) __dlm_dirty_lockres(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); @@ -2048,6 +2128,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm, { struct list_head *iter, *tmpiter; struct dlm_lock *lock; + unsigned int freed = 0; /* this node is the lockres master: * 1) remove any stale locks for the dead node @@ -2062,6 +2143,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm, if (lock->ml.node == dead_node) { list_del_init(&lock->list); dlm_lock_put(lock); + freed++; } } list_for_each_safe(iter, tmpiter, &res->converting) { @@ -2069,6 +2151,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm, if (lock->ml.node == dead_node) { list_del_init(&lock->list); dlm_lock_put(lock); + freed++; } } list_for_each_safe(iter, tmpiter, &res->blocked) { @@ -2076,9 +2159,23 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm, if (lock->ml.node == dead_node) { list_del_init(&lock->list); dlm_lock_put(lock); + freed++; } } + if (freed) { + mlog(0, "%s:%.*s: freed %u locks for dead node %u, " + "dropping ref from lockres\n", dlm->name, + res->lockname.len, res->lockname.name, freed, dead_node); + BUG_ON(!test_bit(dead_node, res->refmap)); + dlm_lockres_clear_refmap_bit(dead_node, res); + } else if (test_bit(dead_node, res->refmap)) { + mlog(0, "%s:%.*s: dead node %u had a ref, but had " + "no locks and had not purged before dying\n", dlm->name, + res->lockname.len, res->lockname.name, dead_node); + dlm_lockres_clear_refmap_bit(dead_node, res); + } + /* do not kick thread yet */ __dlm_dirty_lockres(dlm, res); } @@ -2141,9 +2238,21 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) spin_lock(&res->spinlock); /* zero the lvb if necessary */ dlm_revalidate_lvb(dlm, res, dead_node); - if (res->owner == dead_node) + if (res->owner == dead_node) { + if (res->state & DLM_LOCK_RES_DROPPING_REF) + mlog(0, "%s:%.*s: owned by " + "dead node %u, this node was " + "dropping its ref when it died. " + "continue, dropping the flag.\n", + dlm->name, res->lockname.len, + res->lockname.name, dead_node); + + /* the wake_up for this will happen when the + * RECOVERING flag is dropped later */ + res->state &= ~DLM_LOCK_RES_DROPPING_REF; + dlm_move_lockres_to_recovery_list(dlm, res); - else if (res->owner == dlm->node_num) { + } else if (res->owner == dlm->node_num) { dlm_free_dead_locks(dlm, res, dead_node); __dlm_lockres_calc_usage(dlm, res); } diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index 0c822f3ffb05..620eb824ce1d 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -54,9 +54,6 @@ #include "cluster/masklog.h" static int dlm_thread(void *data); -static void dlm_purge_lockres_now(struct dlm_ctxt *dlm, - struct dlm_lock_resource *lockres); - static void dlm_flush_asts(struct dlm_ctxt *dlm); #define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) @@ -82,14 +79,33 @@ repeat: current->state = TASK_RUNNING; } - -int __dlm_lockres_unused(struct dlm_lock_resource *res) +int __dlm_lockres_has_locks(struct dlm_lock_resource *res) { if (list_empty(&res->granted) && list_empty(&res->converting) && - list_empty(&res->blocked) && - list_empty(&res->dirty)) - return 1; + list_empty(&res->blocked)) + return 0; + return 1; +} + +/* "unused": the lockres has no locks, is not on the dirty list, + * has no inflight locks (in the gap between mastery and acquiring + * the first lock), and has no bits in its refmap. + * truly ready to be freed. */ +int __dlm_lockres_unused(struct dlm_lock_resource *res) +{ + if (!__dlm_lockres_has_locks(res) && + list_empty(&res->dirty)) { + /* try not to scan the bitmap unless the first two + * conditions are already true */ + int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); + if (bit >= O2NM_MAX_NODES) { + /* since the bit for dlm->node_num is not + * set, inflight_locks better be zero */ + BUG_ON(res->inflight_locks != 0); + return 1; + } + } return 0; } @@ -106,46 +122,21 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, assert_spin_locked(&res->spinlock); if (__dlm_lockres_unused(res)){ - /* For now, just keep any resource we master */ - if (res->owner == dlm->node_num) - { - if (!list_empty(&res->purge)) { - mlog(0, "we master %s:%.*s, but it is on " - "the purge list. Removing\n", - dlm->name, res->lockname.len, - res->lockname.name); - list_del_init(&res->purge); - dlm->purge_count--; - } - return; - } - if (list_empty(&res->purge)) { - mlog(0, "putting lockres %.*s from purge list\n", - res->lockname.len, res->lockname.name); + mlog(0, "putting lockres %.*s:%p onto purge list\n", + res->lockname.len, res->lockname.name, res); res->last_used = jiffies; + dlm_lockres_get(res); list_add_tail(&res->purge, &dlm->purge_list); dlm->purge_count++; - - /* if this node is not the owner, there is - * no way to keep track of who the owner could be. - * unhash it to avoid serious problems. */ - if (res->owner != dlm->node_num) { - mlog(0, "%s:%.*s: doing immediate " - "purge of lockres owned by %u\n", - dlm->name, res->lockname.len, - res->lockname.name, res->owner); - - dlm_purge_lockres_now(dlm, res); - } } } else if (!list_empty(&res->purge)) { - mlog(0, "removing lockres %.*s from purge list, " - "owner=%u\n", res->lockname.len, res->lockname.name, - res->owner); + mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n", + res->lockname.len, res->lockname.name, res, res->owner); list_del_init(&res->purge); + dlm_lockres_put(res); dlm->purge_count--; } } @@ -163,68 +154,60 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, spin_unlock(&dlm->spinlock); } -/* TODO: Eventual API: Called with the dlm spinlock held, may drop it - * to do migration, but will re-acquire before exit. */ -void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres) +int dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) { int master; - int ret; - - spin_lock(&lockres->spinlock); - master = lockres->owner == dlm->node_num; - spin_unlock(&lockres->spinlock); - - mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len, - lockres->lockname.name, master); + int ret = 0; - /* Non master is the easy case -- no migration required, just - * quit. */ + spin_lock(&res->spinlock); + if (!__dlm_lockres_unused(res)) { + spin_unlock(&res->spinlock); + mlog(0, "%s:%.*s: tried to purge but not unused\n", + dlm->name, res->lockname.len, res->lockname.name); + return -ENOTEMPTY; + } + master = (res->owner == dlm->node_num); if (!master) - goto finish; - - /* Wheee! Migrate lockres here! */ - spin_unlock(&dlm->spinlock); -again: + res->state |= DLM_LOCK_RES_DROPPING_REF; + spin_unlock(&res->spinlock); - ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES); - if (ret == -ENOTEMPTY) { - mlog(ML_ERROR, "lockres %.*s still has local locks!\n", - lockres->lockname.len, lockres->lockname.name); + mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len, + res->lockname.name, master); - BUG(); - } else if (ret < 0) { - mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n", - lockres->lockname.len, lockres->lockname.name); - msleep(100); - goto again; + if (!master) { + /* drop spinlock to do messaging, retake below */ + spin_unlock(&dlm->spinlock); + /* clear our bit from the master's refmap, ignore errors */ + ret = dlm_drop_lockres_ref(dlm, res); + if (ret < 0) { + mlog_errno(ret); + if (!dlm_is_host_down(ret)) + BUG(); + } + mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n", + dlm->name, res->lockname.len, res->lockname.name, ret); + spin_lock(&dlm->spinlock); } - spin_lock(&dlm->spinlock); - -finish: - if (!list_empty(&lockres->purge)) { - list_del_init(&lockres->purge); + if (!list_empty(&res->purge)) { + mlog(0, "removing lockres %.*s:%p from purgelist, " + "master = %d\n", res->lockname.len, res->lockname.name, + res, master); + list_del_init(&res->purge); + dlm_lockres_put(res); dlm->purge_count--; } - __dlm_unhash_lockres(lockres); -} - -/* make an unused lockres go away immediately. - * as soon as the dlm spinlock is dropped, this lockres - * will not be found. kfree still happens on last put. */ -static void dlm_purge_lockres_now(struct dlm_ctxt *dlm, - struct dlm_lock_resource *lockres) -{ - assert_spin_locked(&dlm->spinlock); - assert_spin_locked(&lockres->spinlock); - - BUG_ON(!__dlm_lockres_unused(lockres)); + __dlm_unhash_lockres(res); - if (!list_empty(&lockres->purge)) { - list_del_init(&lockres->purge); - dlm->purge_count--; + /* lockres is not in the hash now. drop the flag and wake up + * any processes waiting in dlm_get_lock_resource. */ + if (!master) { + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_DROPPING_REF; + spin_unlock(&res->spinlock); + wake_up(&res->wq); } - __dlm_unhash_lockres(lockres); + return 0; } static void dlm_run_purge_list(struct dlm_ctxt *dlm, @@ -268,13 +251,17 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm, break; } + mlog(0, "removing lockres %.*s:%p from purgelist\n", + lockres->lockname.len, lockres->lockname.name, lockres); list_del_init(&lockres->purge); + dlm_lockres_put(lockres); dlm->purge_count--; /* This may drop and reacquire the dlm spinlock if it * has to do migration. */ mlog(0, "calling dlm_purge_lockres!\n"); - dlm_purge_lockres(dlm, lockres); + if (dlm_purge_lockres(dlm, lockres)) + BUG(); mlog(0, "DONE calling dlm_purge_lockres!\n"); /* Avoid adding any scheduling latencies */ -- cgit v1.2.1 From faf0ec9f13defb57f4269ecb22ed86f2874ee89a Mon Sep 17 00:00:00 2001 From: Adrian Bunk Date: Thu, 14 Dec 2006 00:17:32 +0100 Subject: [PATCH] fs/ocfs2/dlm/: make functions static This patch makes some needlessly global functions static. Signed-off-by: Adrian Bunk Signed-off-by: Mark Fasheh --- fs/ocfs2/dlm/dlmcommon.h | 6 ------ fs/ocfs2/dlm/dlmmaster.c | 8 +++++--- fs/ocfs2/dlm/dlmthread.c | 3 ++- 3 files changed, 7 insertions(+), 10 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 9fa427119a3c..04048bb1a1bd 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -739,8 +739,6 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); -int dlm_purge_lockres(struct dlm_ctxt *dlm, - struct dlm_lock_resource *lockres); static inline void dlm_lockres_get(struct dlm_lock_resource *res) { /* This is called on every lookup, so it might be worth @@ -864,10 +862,6 @@ int dlm_heartbeat_init(struct dlm_ctxt *dlm); void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data); void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data); -int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); -int dlm_migrate_lockres(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - u8 target); int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 4645ec2e0fc3..251c48028ea3 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -2327,8 +2327,9 @@ done: */ -int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, - u8 target) +static int dlm_migrate_lockres(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + u8 target) { struct dlm_master_list_entry *mle = NULL; struct dlm_master_list_entry *oldmle = NULL; @@ -2676,7 +2677,8 @@ static int dlm_migration_can_proceed(struct dlm_ctxt *dlm, return can_proceed; } -int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) +static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) { int ret; spin_lock(&res->spinlock); diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index 620eb824ce1d..baa99979904c 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -154,7 +154,8 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, spin_unlock(&dlm->spinlock); } -int dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) +static int dlm_purge_lockres(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) { int master; int ret = 0; -- cgit v1.2.1 From ddc09c8ddac8d0f170ba8caa8128801f358dccff Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Fri, 5 Jan 2007 15:00:17 -0800 Subject: ocfs2_dlm: Fixes race between migrate and dirty dlmthread was removing lockres' from the dirty list and resetting the dirty flag before shuffling the list. This patch retains the dirty state flag until the lists are shuffled. Signed-off-by: Kurt Hackel Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh --- fs/ocfs2/dlm/dlmcommon.h | 1 + fs/ocfs2/dlm/dlmmaster.c | 16 +++++++++++++++- fs/ocfs2/dlm/dlmthread.c | 30 +++++++++++++++++------------- 3 files changed, 33 insertions(+), 14 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 04048bb1a1bd..e95ecb2aaf14 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -223,6 +223,7 @@ static inline void __dlm_set_joining_node(struct dlm_ctxt *dlm, #define DLM_LOCK_RES_IN_PROGRESS 0x00000010 #define DLM_LOCK_RES_MIGRATING 0x00000020 #define DLM_LOCK_RES_DROPPING_REF 0x00000040 +#define DLM_LOCK_RES_BLOCK_DIRTY 0x00001000 /* max milliseconds to wait to sync up a network failure with a node death */ #define DLM_NODE_DEATH_WAIT_MAX (5 * 1000) diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 251c48028ea3..a65a87726d6a 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -2707,8 +2707,15 @@ static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm, __dlm_lockres_reserve_ast(res); spin_unlock(&res->spinlock); - /* now flush all the pending asts.. hang out for a bit */ + /* now flush all the pending asts */ dlm_kick_thread(dlm, res); + /* before waiting on DIRTY, block processes which may + * try to dirty the lockres before MIGRATING is set */ + spin_lock(&res->spinlock); + BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY); + res->state |= DLM_LOCK_RES_BLOCK_DIRTY; + spin_unlock(&res->spinlock); + /* now wait on any pending asts and the DIRTY state */ wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res)); dlm_lockres_release_ast(dlm, res); @@ -2734,6 +2741,13 @@ again: mlog(0, "trying again...\n"); goto again; } + /* now that we are sure the MIGRATING state is there, drop + * the unneded state which blocked threads trying to DIRTY */ + spin_lock(&res->spinlock); + BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY)); + BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING)); + res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY; + spin_unlock(&res->spinlock); /* did the target go down or die? */ spin_lock(&dlm->spinlock); diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index baa99979904c..3b94e4dec351 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -95,7 +95,7 @@ int __dlm_lockres_has_locks(struct dlm_lock_resource *res) int __dlm_lockres_unused(struct dlm_lock_resource *res) { if (!__dlm_lockres_has_locks(res) && - list_empty(&res->dirty)) { + (list_empty(&res->dirty) && !(res->state & DLM_LOCK_RES_DIRTY))) { /* try not to scan the bitmap unless the first two * conditions are already true */ int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); @@ -455,12 +455,17 @@ void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) assert_spin_locked(&res->spinlock); /* don't shuffle secondary queues */ - if ((res->owner == dlm->node_num) && - !(res->state & DLM_LOCK_RES_DIRTY)) { - /* ref for dirty_list */ - dlm_lockres_get(res); - list_add_tail(&res->dirty, &dlm->dirty_list); - res->state |= DLM_LOCK_RES_DIRTY; + if ((res->owner == dlm->node_num)) { + if (res->state & (DLM_LOCK_RES_MIGRATING | + DLM_LOCK_RES_BLOCK_DIRTY)) + return; + + if (list_empty(&res->dirty)) { + /* ref for dirty_list */ + dlm_lockres_get(res); + list_add_tail(&res->dirty, &dlm->dirty_list); + res->state |= DLM_LOCK_RES_DIRTY; + } } } @@ -639,7 +644,7 @@ static int dlm_thread(void *data) dlm_lockres_get(res); spin_lock(&res->spinlock); - res->state &= ~DLM_LOCK_RES_DIRTY; + /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */ list_del_init(&res->dirty); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); @@ -663,10 +668,11 @@ static int dlm_thread(void *data) /* it is now ok to move lockreses in these states * to the dirty list, assuming that they will only be * dirty for a short while. */ + BUG_ON(res->state & DLM_LOCK_RES_MIGRATING); if (res->state & (DLM_LOCK_RES_IN_PROGRESS | - DLM_LOCK_RES_MIGRATING | DLM_LOCK_RES_RECOVERING)) { /* move it to the tail and keep going */ + res->state &= ~DLM_LOCK_RES_DIRTY; spin_unlock(&res->spinlock); mlog(0, "delaying list shuffling for in-" "progress lockres %.*s, state=%d\n", @@ -687,6 +693,7 @@ static int dlm_thread(void *data) /* called while holding lockres lock */ dlm_shuffle_lists(dlm, res); + res->state &= ~DLM_LOCK_RES_DIRTY; spin_unlock(&res->spinlock); dlm_lockres_calc_usage(dlm, res); @@ -697,11 +704,8 @@ in_progress: /* if the lock was in-progress, stick * it on the back of the list */ if (delay) { - /* ref for dirty_list */ - dlm_lockres_get(res); spin_lock(&res->spinlock); - list_add_tail(&res->dirty, &dlm->dirty_list); - res->state |= DLM_LOCK_RES_DIRTY; + __dlm_dirty_lockres(dlm, res); spin_unlock(&res->spinlock); } dlm_lockres_put(res); -- cgit v1.2.1 From 71ac1062435ba2d58bf64817b47a6e44f316752e Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Fri, 5 Jan 2007 15:02:30 -0800 Subject: ocfs2_dlm: Make dlmunlock() wait for migration to complete dlmunlock() was not waiting for migration to complete before releasing locks on locally mastered locks. Signed-off-by: Kurt Hackel Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh --- fs/ocfs2/dlm/dlmrecovery.c | 1 + fs/ocfs2/dlm/dlmunlock.c | 4 ++++ 2 files changed, 5 insertions(+) (limited to 'fs') diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index d011a2a22742..3057b65a4b8b 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1763,6 +1763,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, "with cookie %u:%llu!\n", dlm_get_lock_cookie_node(c), dlm_get_lock_cookie_seq(c)); + __dlm_print_one_lock_resource(res); BUG(); } BUG_ON(lock->ml.node != ml->node); diff --git a/fs/ocfs2/dlm/dlmunlock.c b/fs/ocfs2/dlm/dlmunlock.c index 37be4b2e0d4a..3c8a250fcfec 100644 --- a/fs/ocfs2/dlm/dlmunlock.c +++ b/fs/ocfs2/dlm/dlmunlock.c @@ -147,6 +147,10 @@ static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm, goto leave; } + if (res->state & DLM_LOCK_RES_MIGRATING) { + status = DLM_MIGRATING; + goto leave; + } /* see above for what the spec says about * LKM_CANCEL and the lock queue state */ -- cgit v1.2.1 From e17e75ecb86b8ce9b51b219b5348517561031f80 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Fri, 5 Jan 2007 15:04:49 -0800 Subject: ocfs2_dlm: Fix migrate lockres handler queue scanning The migrate lockres handler was only searching for its lock on migrated lockres on the expected queue. This could be problematic as the new master could have also issued a convert request during the migration and thus moved the lock to the convert queue. We now search for the lock on all three queues. Signed-off-by: Kurt Hackel Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh --- fs/ocfs2/dlm/dlmrecovery.c | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 3057b65a4b8b..f93315c98871 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1708,10 +1708,11 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, { struct dlm_migratable_lock *ml; struct list_head *queue; + struct list_head *tmpq = NULL; struct dlm_lock *newlock = NULL; struct dlm_lockstatus *lksb = NULL; int ret = 0; - int i, bad; + int i, j, bad; struct list_head *iter; struct dlm_lock *lock = NULL; u8 from = O2NM_MAX_NODES; @@ -1738,6 +1739,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, lksb = NULL; queue = dlm_list_num_to_pointer(res, ml->list); + tmpq = NULL; /* if the lock is for the local node it needs to * be moved to the proper location within the queue. @@ -1747,11 +1749,16 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, BUG_ON(!(mres->flags & DLM_MRES_MIGRATION)); spin_lock(&res->spinlock); - list_for_each(iter, queue) { - lock = list_entry (iter, struct dlm_lock, list); - if (lock->ml.cookie != ml->cookie) - lock = NULL; - else + for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) { + tmpq = dlm_list_idx_to_ptr(res, j); + list_for_each(iter, tmpq) { + lock = list_entry (iter, struct dlm_lock, list); + if (lock->ml.cookie != ml->cookie) + lock = NULL; + else + break; + } + if (lock) break; } @@ -1768,6 +1775,13 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, } BUG_ON(lock->ml.node != ml->node); + if (tmpq != queue) { + mlog(0, "lock was on %u instead of %u for %.*s\n", + j, ml->list, res->lockname.len, res->lockname.name); + spin_unlock(&res->spinlock); + continue; + } + /* see NOTE above about why we do not update * to match the master here */ -- cgit v1.2.1 From 1cd04dbe3364be71b93e3aaf4545daa1e261aaa1 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Wed, 17 Jan 2007 14:53:37 -0800 Subject: ocfs2_dlm: Flush dlm workqueue before starting to migrate This is to prevent the condition in which a previously queued up assert master asserts after we start the migration. Now migration ensures the workqueue is flushed before proceeding with migrating the lock to another node. This condition is typically encountered during parallel umounts. Signed-off-by: Kurt Hackel Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh --- fs/ocfs2/dlm/dlmmaster.c | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index a65a87726d6a..b36cce034ea0 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -1507,10 +1507,11 @@ way_up_top: /* take care of the easy cases up front */ spin_lock(&res->spinlock); - if (res->state & DLM_LOCK_RES_RECOVERING) { + if (res->state & (DLM_LOCK_RES_RECOVERING| + DLM_LOCK_RES_MIGRATING)) { spin_unlock(&res->spinlock); mlog(0, "returning DLM_MASTER_RESP_ERROR since res is " - "being recovered\n"); + "being recovered/migrated\n"); response = DLM_MASTER_RESP_ERROR; if (mle) kmem_cache_free(dlm_mle_cache, mle); @@ -2493,6 +2494,9 @@ fail: * the lockres */ + /* now that remote nodes are spinning on the MIGRATING flag, + * ensure that all assert_master work is flushed. */ + flush_workqueue(dlm->dlm_worker); /* get an extra reference on the mle. * otherwise the assert_master from the new @@ -2547,7 +2551,8 @@ fail: res->owner == target) break; - mlog(0, "timed out during migration\n"); + mlog(0, "%s:%.*s: timed out during migration\n", + dlm->name, res->lockname.len, res->lockname.name); /* avoid hang during shutdown when migrating lockres * to a node which also goes down */ if (dlm_is_node_dead(dlm, target)) { @@ -2555,20 +2560,19 @@ fail: "target %u is no longer up, restarting\n", dlm->name, res->lockname.len, res->lockname.name, target); - ret = -ERESTARTSYS; + ret = -EINVAL; + /* migration failed, detach and clean up mle */ + dlm_mle_detach_hb_events(dlm, mle); + dlm_put_mle(mle); + dlm_put_mle_inuse(mle); + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_MIGRATING; + spin_unlock(&res->spinlock); + goto leave; } - } - if (ret == -ERESTARTSYS) { - /* migration failed, detach and clean up mle */ - dlm_mle_detach_hb_events(dlm, mle); - dlm_put_mle(mle); - dlm_put_mle_inuse(mle); - spin_lock(&res->spinlock); - res->state &= ~DLM_LOCK_RES_MIGRATING; - spin_unlock(&res->spinlock); - goto leave; - } - /* TODO: if node died: stop, clean up, return error */ + } else + mlog(0, "%s:%.*s: caught signal during migration\n", + dlm->name, res->lockname.len, res->lockname.name); } /* all done, set the owner, clear the flag */ -- cgit v1.2.1 From 50635f15b324cbf45a58f103e6b4c7e42502b683 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Wed, 17 Jan 2007 14:54:39 -0800 Subject: ocfs2_dlm: Drop inflight refmap even if no locks found on the lockres Signed-off-by: Kurt Hackel Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh --- fs/ocfs2/dlm/dlmrecovery.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index f93315c98871..2e32fe65c6c2 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1908,11 +1908,9 @@ skip_lvb: leave: /* balance the ref taken when the work was queued */ - if (added > 0) { - spin_lock(&res->spinlock); - dlm_lockres_drop_inflight_ref(dlm, res); - spin_unlock(&res->spinlock); - } + spin_lock(&res->spinlock); + dlm_lockres_drop_inflight_ref(dlm, res); + spin_unlock(&res->spinlock); if (ret < 0) { mlog_errno(ret); -- cgit v1.2.1 From 28b72d9c92ed43e01e4094f57bcad1814b002779 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Wed, 17 Jan 2007 14:57:50 -0800 Subject: ocfs2_dlm: Dlm dispatch was stopping too early dlm_dispatch_work was not processing the queued up tasks at the first sign of the node leaving the domain leading to not only incompleted tasks but also a mismatch in the dlm refcnt. Signed-off-by: Kurt Hackel Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh --- fs/ocfs2/dlm/dlmrecovery.c | 3 --- 1 file changed, 3 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 2e32fe65c6c2..8c60ccc7460c 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -163,9 +163,6 @@ void dlm_dispatch_work(struct work_struct *work) dlm_workfunc_t *workfunc; int tot=0; - if (!dlm_joined(dlm)) - return; - spin_lock(&dlm->work_lock); list_splice_init(&dlm->work_list, &tmp_list); spin_unlock(&dlm->work_lock); -- cgit v1.2.1 From a6fa36402aba96362311318200d710ea1719e59b Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Wed, 17 Jan 2007 14:59:12 -0800 Subject: ocfs2_dlm: wake up sleepers on the lockres waitqueue The dlm was not waking up threads waiting on the lockres wait queue, waiting for the lockres to be no longer be in the DLM_LOCK_RES_IN_PROGRESS and the DLM_LOCK_RES_MIGRATING states. Signed-off-by: Kurt Hackel Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh --- fs/ocfs2/dlm/dlmconvert.c | 5 ++++- fs/ocfs2/dlm/dlmmaster.c | 14 +++++++++++++- fs/ocfs2/dlm/dlmrecovery.c | 1 + 3 files changed, 18 insertions(+), 2 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c index c764dc8e40a2..42c177444850 100644 --- a/fs/ocfs2/dlm/dlmconvert.c +++ b/fs/ocfs2/dlm/dlmconvert.c @@ -428,7 +428,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data) struct dlm_lockstatus *lksb; enum dlm_status status = DLM_NORMAL; u32 flags; - int call_ast = 0, kick_thread = 0, ast_reserved = 0; + int call_ast = 0, kick_thread = 0, ast_reserved = 0, wake = 0; if (!dlm_grab(dlm)) { dlm_error(DLM_REJECTED); @@ -524,8 +524,11 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data) cnv->requested_type, &call_ast, &kick_thread); res->state &= ~DLM_LOCK_RES_IN_PROGRESS; + wake = 1; } spin_unlock(&res->spinlock); + if (wake) + wake_up(&res->wq); if (status != DLM_NORMAL) { if (status != DLM_NOTQUEUED) diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index b36cce034ea0..6cfbdf282d46 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -1967,6 +1967,7 @@ ok: spin_unlock(&mle->spinlock); if (res) { + int wake = 0; spin_lock(&res->spinlock); if (mle->type == DLM_MLE_MIGRATION) { mlog(0, "finishing off migration of lockres %.*s, " @@ -1974,6 +1975,7 @@ ok: res->lockname.len, res->lockname.name, dlm->node_num, mle->new_master); res->state &= ~DLM_LOCK_RES_MIGRATING; + wake = 1; dlm_change_lockres_owner(dlm, res, mle->new_master); BUG_ON(res->state & DLM_LOCK_RES_DIRTY); } else { @@ -1981,6 +1983,8 @@ ok: } spin_unlock(&res->spinlock); have_lockres_ref = 1; + if (wake) + wake_up(&res->wq); } /* master is known, detach if not already detached. @@ -2342,7 +2346,7 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct list_head *queue, *iter; int i; struct dlm_lock *lock; - int empty = 1; + int empty = 1, wake = 0; if (!dlm_grab(dlm)) return -EINVAL; @@ -2467,6 +2471,7 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm, res->lockname.name, target); spin_lock(&res->spinlock); res->state &= ~DLM_LOCK_RES_MIGRATING; + wake = 1; spin_unlock(&res->spinlock); ret = -EINVAL; } @@ -2525,6 +2530,7 @@ fail: dlm_put_mle_inuse(mle); spin_lock(&res->spinlock); res->state &= ~DLM_LOCK_RES_MIGRATING; + wake = 1; spin_unlock(&res->spinlock); goto leave; } @@ -2567,6 +2573,7 @@ fail: dlm_put_mle_inuse(mle); spin_lock(&res->spinlock); res->state &= ~DLM_LOCK_RES_MIGRATING; + wake = 1; spin_unlock(&res->spinlock); goto leave; } @@ -2595,6 +2602,11 @@ leave: if (ret < 0) dlm_kick_thread(dlm, res); + /* wake up waiters if the MIGRATING flag got set + * but migration failed */ + if (wake) + wake_up(&res->wq); + /* TODO: cleanup */ if (mres) free_page((unsigned long)mres); diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 8c60ccc7460c..e57636c399f4 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1420,6 +1420,7 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data) spin_lock(&res->spinlock); res->state &= ~DLM_LOCK_RES_IN_PROGRESS; spin_unlock(&res->spinlock); + wake_up(&res->wq); /* add an extra ref for just-allocated lockres * otherwise the lockres will be purged immediately */ -- cgit v1.2.1 From 90aaaf1c235a70daee04e897e9501415b766de69 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Wed, 17 Jan 2007 15:01:45 -0800 Subject: ocfs2_dlm: Silence a failed convert When the lockres is in migrate or recovery state, all convert requests are denied with the appropriate error status that is handled on the requester node. This patch silences the erroneous error message printed on the master node. Signed-off-by: Kurt Hackel Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh --- fs/ocfs2/dlm/dlmconvert.c | 28 ++++++---------------------- 1 file changed, 6 insertions(+), 22 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c index 42c177444850..370f23c385f1 100644 --- a/fs/ocfs2/dlm/dlmconvert.c +++ b/fs/ocfs2/dlm/dlmconvert.c @@ -479,25 +479,14 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data) } lock = NULL; } - if (!lock) { - __dlm_print_one_lock_resource(res); - list_for_each(iter, &res->granted) { - lock = list_entry(iter, struct dlm_lock, list); - if (lock->ml.node == cnv->node_idx) { - mlog(ML_ERROR, "There is something here " - "for node %u, lock->ml.cookie=%llu, " - "cnv->cookie=%llu\n", cnv->node_idx, - (unsigned long long)lock->ml.cookie, - (unsigned long long)cnv->cookie); - break; - } - } - lock = NULL; - } spin_unlock(&res->spinlock); if (!lock) { status = DLM_IVLOCKID; - dlm_error(status); + mlog(ML_ERROR, "did not find lock to convert on grant queue! " + "cookie=%u:%llu\n", + dlm_get_lock_cookie_node(cnv->cookie), + dlm_get_lock_cookie_seq(cnv->cookie)); + __dlm_print_one_lock_resource(res); goto leave; } @@ -537,12 +526,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data) } leave: - if (!lock) - mlog(ML_ERROR, "did not find lock to convert on grant queue! " - "cookie=%u:%llu\n", - dlm_get_lock_cookie_node(cnv->cookie), - dlm_get_lock_cookie_seq(cnv->cookie)); - else + if (lock) dlm_lock_put(lock); /* either queue the ast or release it, if reserved */ -- cgit v1.2.1 From 74aa25856c693d20a886cdb31a004aaca411d135 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Wed, 17 Jan 2007 15:11:36 -0800 Subject: ocfs2_dlm: Cookies in locks not being printed correctly in error messages The dlm encodes the node number and a sequence number in the lock cookie. It also stores the cookie in the lockres in the big endian format to avoid swapping 8 bytes on each lock request. The bug here was that it was assuming the cookie to be in the cpu format when decoding it for printing the error message. This patch swaps the bytes before the print. Signed-off-by: Kurt Hackel Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh --- fs/ocfs2/dlm/dlmast.c | 11 ++++++----- fs/ocfs2/dlm/dlmconvert.c | 8 ++++---- fs/ocfs2/dlm/dlmdebug.c | 12 ++++++------ fs/ocfs2/dlm/dlmrecovery.c | 12 ++++++------ fs/ocfs2/dlm/dlmunlock.c | 8 ++++---- 5 files changed, 26 insertions(+), 25 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/dlm/dlmast.c b/fs/ocfs2/dlm/dlmast.c index 681046d51393..ad5e7e1fa1ff 100644 --- a/fs/ocfs2/dlm/dlmast.c +++ b/fs/ocfs2/dlm/dlmast.c @@ -311,8 +311,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data) past->type != DLM_BAST) { mlog(ML_ERROR, "Unknown ast type! %d, cookie=%u:%llu" "name=%.*s\n", past->type, - dlm_get_lock_cookie_node(cookie), - dlm_get_lock_cookie_seq(cookie), + dlm_get_lock_cookie_node(be64_to_cpu(cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(cookie)), locklen, name); ret = DLM_IVLOCKID; goto leave; @@ -323,8 +323,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data) mlog(0, "got %sast for unknown lockres! " "cookie=%u:%llu, name=%.*s, namelen=%u\n", past->type == DLM_AST ? "" : "b", - dlm_get_lock_cookie_node(cookie), - dlm_get_lock_cookie_seq(cookie), + dlm_get_lock_cookie_node(be64_to_cpu(cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(cookie)), locklen, name, locklen); ret = DLM_IVLOCKID; goto leave; @@ -369,7 +369,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data) mlog(0, "got %sast for unknown lock! cookie=%u:%llu, " "name=%.*s, namelen=%u\n", past->type == DLM_AST ? "" : "b", - dlm_get_lock_cookie_node(cookie), dlm_get_lock_cookie_seq(cookie), + dlm_get_lock_cookie_node(be64_to_cpu(cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(cookie)), locklen, name, locklen); ret = DLM_NORMAL; diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c index 370f23c385f1..59fb63da8b65 100644 --- a/fs/ocfs2/dlm/dlmconvert.c +++ b/fs/ocfs2/dlm/dlmconvert.c @@ -286,8 +286,8 @@ enum dlm_status dlmconvert_remote(struct dlm_ctxt *dlm, __dlm_print_one_lock_resource(res); mlog(ML_ERROR, "converting a remote lock that is already " "converting! (cookie=%u:%llu, conv=%d)\n", - dlm_get_lock_cookie_node(lock->ml.cookie), - dlm_get_lock_cookie_seq(lock->ml.cookie), + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), lock->ml.convert_type); status = DLM_DENIED; goto bail; @@ -484,8 +484,8 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data) status = DLM_IVLOCKID; mlog(ML_ERROR, "did not find lock to convert on grant queue! " "cookie=%u:%llu\n", - dlm_get_lock_cookie_node(cnv->cookie), - dlm_get_lock_cookie_seq(cnv->cookie)); + dlm_get_lock_cookie_node(be64_to_cpu(cnv->cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(cnv->cookie))); __dlm_print_one_lock_resource(res); goto leave; } diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c index 1015cc7bf9cb..64239b37e5d4 100644 --- a/fs/ocfs2/dlm/dlmdebug.c +++ b/fs/ocfs2/dlm/dlmdebug.c @@ -90,8 +90,8 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res) mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, " "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n", lock->ml.type, lock->ml.convert_type, lock->ml.node, - dlm_get_lock_cookie_node(lock->ml.cookie), - dlm_get_lock_cookie_seq(lock->ml.cookie), + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), list_empty(&lock->ast_list) ? 'y' : 'n', lock->ast_pending ? 'y' : 'n', list_empty(&lock->bast_list) ? 'y' : 'n', @@ -105,8 +105,8 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res) mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, " "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n", lock->ml.type, lock->ml.convert_type, lock->ml.node, - dlm_get_lock_cookie_node(lock->ml.cookie), - dlm_get_lock_cookie_seq(lock->ml.cookie), + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), list_empty(&lock->ast_list) ? 'y' : 'n', lock->ast_pending ? 'y' : 'n', list_empty(&lock->bast_list) ? 'y' : 'n', @@ -120,8 +120,8 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res) mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, " "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n", lock->ml.type, lock->ml.convert_type, lock->ml.node, - dlm_get_lock_cookie_node(lock->ml.cookie), - dlm_get_lock_cookie_seq(lock->ml.cookie), + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), list_empty(&lock->ast_list) ? 'y' : 'n', lock->ast_pending ? 'y' : 'n', list_empty(&lock->bast_list) ? 'y' : 'n', diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index e57636c399f4..38d714645309 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1766,8 +1766,8 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, u64 c = ml->cookie; mlog(ML_ERROR, "could not find local lock " "with cookie %u:%llu!\n", - dlm_get_lock_cookie_node(c), - dlm_get_lock_cookie_seq(c)); + dlm_get_lock_cookie_node(be64_to_cpu(c)), + dlm_get_lock_cookie_seq(be64_to_cpu(c))); __dlm_print_one_lock_resource(res); BUG(); } @@ -1876,14 +1876,14 @@ skip_lvb: mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already " "exists on this lockres!\n", dlm->name, res->lockname.len, res->lockname.name, - dlm_get_lock_cookie_node(c), - dlm_get_lock_cookie_seq(c)); + dlm_get_lock_cookie_node(be64_to_cpu(c)), + dlm_get_lock_cookie_seq(be64_to_cpu(c))); mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, " "node=%u, cookie=%u:%llu, queue=%d\n", ml->type, ml->convert_type, ml->node, - dlm_get_lock_cookie_node(ml->cookie), - dlm_get_lock_cookie_seq(ml->cookie), + dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)), ml->list); __dlm_print_one_lock_resource(res); diff --git a/fs/ocfs2/dlm/dlmunlock.c b/fs/ocfs2/dlm/dlmunlock.c index 3c8a250fcfec..fc8baa3e9539 100644 --- a/fs/ocfs2/dlm/dlmunlock.c +++ b/fs/ocfs2/dlm/dlmunlock.c @@ -248,8 +248,8 @@ leave: /* this should always be coupled with list removal */ BUG_ON(!(actions & DLM_UNLOCK_REMOVE_LOCK)); mlog(0, "lock %u:%llu should be gone now! refs=%d\n", - dlm_get_lock_cookie_node(lock->ml.cookie), - dlm_get_lock_cookie_seq(lock->ml.cookie), + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), atomic_read(&lock->lock_refs.refcount)-1); dlm_lock_put(lock); } @@ -506,8 +506,8 @@ not_found: if (!found) mlog(ML_ERROR, "failed to find lock to unlock! " "cookie=%u:%llu\n", - dlm_get_lock_cookie_node(unlock->cookie), - dlm_get_lock_cookie_seq(unlock->cookie)); + dlm_get_lock_cookie_node(be64_to_cpu(unlock->cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(unlock->cookie))); else dlm_lock_put(lock); -- cgit v1.2.1 From d74c9803a90d733f5fb7270475aa6d14b45796c6 Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Wed, 17 Jan 2007 17:04:25 -0800 Subject: ocfs2: Added post handler callable function in o2net message handler Currently o2net allows one handler function per message type. This patch adds the ability to call another function to be called after the handler has returned the message to the other node. Handlers are now given the option of returning a context (in the form of a void **) which will be passed back into the post message handler function. Signed-off-by: Kurt Hackel Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh --- fs/ocfs2/cluster/tcp.c | 12 ++++++++- fs/ocfs2/cluster/tcp.h | 6 ++++- fs/ocfs2/cluster/tcp_internal.h | 2 ++ fs/ocfs2/dlm/dlmast.c | 3 ++- fs/ocfs2/dlm/dlmcommon.h | 42 +++++++++++++++++++---------- fs/ocfs2/dlm/dlmconvert.c | 3 ++- fs/ocfs2/dlm/dlmdomain.c | 60 +++++++++++++++++++++++------------------ fs/ocfs2/dlm/dlmlock.c | 3 ++- fs/ocfs2/dlm/dlmmaster.c | 12 ++++++--- fs/ocfs2/dlm/dlmrecovery.c | 18 ++++++++----- fs/ocfs2/dlm/dlmunlock.c | 3 ++- fs/ocfs2/vote.c | 8 +++--- 12 files changed, 112 insertions(+), 60 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c index ae4ff4a6636b..7700418d25ec 100644 --- a/fs/ocfs2/cluster/tcp.c +++ b/fs/ocfs2/cluster/tcp.c @@ -688,6 +688,7 @@ static void o2net_handler_put(struct o2net_msg_handler *nmh) * be given to the handler if their payload is longer than the max. */ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, o2net_msg_handler_func *func, void *data, + o2net_post_msg_handler_func *post_func, struct list_head *unreg_list) { struct o2net_msg_handler *nmh = NULL; @@ -722,6 +723,7 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, nmh->nh_func = func; nmh->nh_func_data = data; + nmh->nh_post_func = post_func; nmh->nh_msg_type = msg_type; nmh->nh_max_len = max_len; nmh->nh_key = key; @@ -1049,6 +1051,7 @@ static int o2net_process_message(struct o2net_sock_container *sc, int ret = 0, handler_status; enum o2net_system_error syserr; struct o2net_msg_handler *nmh = NULL; + void *ret_data = NULL; msglog(hdr, "processing message\n"); @@ -1101,7 +1104,7 @@ static int o2net_process_message(struct o2net_sock_container *sc, sc->sc_msg_type = be16_to_cpu(hdr->msg_type); handler_status = (nmh->nh_func)(hdr, sizeof(struct o2net_msg) + be16_to_cpu(hdr->data_len), - nmh->nh_func_data); + nmh->nh_func_data, &ret_data); do_gettimeofday(&sc->sc_tv_func_stop); out_respond: @@ -1112,6 +1115,13 @@ out_respond: mlog(0, "sending handler status %d, syserr %d returned %d\n", handler_status, syserr, ret); + if (nmh) { + BUG_ON(ret_data != NULL && nmh->nh_post_func == NULL); + if (nmh->nh_post_func) + (nmh->nh_post_func)(handler_status, nmh->nh_func_data, + ret_data); + } + out: if (nmh) o2net_handler_put(nmh); diff --git a/fs/ocfs2/cluster/tcp.h b/fs/ocfs2/cluster/tcp.h index 21a4e43df836..da880fc215f0 100644 --- a/fs/ocfs2/cluster/tcp.h +++ b/fs/ocfs2/cluster/tcp.h @@ -50,7 +50,10 @@ struct o2net_msg __u8 buf[0]; }; -typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data); +typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +typedef void (o2net_post_msg_handler_func)(int status, void *data, + void *ret_data); #define O2NET_MAX_PAYLOAD_BYTES (4096 - sizeof(struct o2net_msg)) @@ -99,6 +102,7 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *vec, int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, o2net_msg_handler_func *func, void *data, + o2net_post_msg_handler_func *post_func, struct list_head *unreg_list); void o2net_unregister_handler_list(struct list_head *list); diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h index 775c911342f4..d74040fac343 100644 --- a/fs/ocfs2/cluster/tcp_internal.h +++ b/fs/ocfs2/cluster/tcp_internal.h @@ -161,6 +161,8 @@ struct o2net_msg_handler { u32 nh_key; o2net_msg_handler_func *nh_func; o2net_msg_handler_func *nh_func_data; + o2net_post_msg_handler_func + *nh_post_func; struct kref nh_kref; struct list_head nh_unregister_item; }; diff --git a/fs/ocfs2/dlm/dlmast.c b/fs/ocfs2/dlm/dlmast.c index ad5e7e1fa1ff..241cad342a48 100644 --- a/fs/ocfs2/dlm/dlmast.c +++ b/fs/ocfs2/dlm/dlmast.c @@ -263,7 +263,8 @@ void dlm_do_local_bast(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, -int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { int ret; unsigned int locklen; diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index e95ecb2aaf14..2df6fde3e652 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -707,16 +707,20 @@ void dlm_lock_put(struct dlm_lock *lock); void dlm_lock_attach_lockres(struct dlm_lock *lock, struct dlm_lock_resource *res); -int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data); -int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data); -int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data); +int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); void dlm_revert_pending_convert(struct dlm_lock_resource *res, struct dlm_lock *lock); void dlm_revert_pending_lock(struct dlm_lock_resource *res, struct dlm_lock *lock); -int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data); +int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); void dlm_commit_pending_cancel(struct dlm_lock_resource *res, struct dlm_lock *lock); void dlm_commit_pending_unlock(struct dlm_lock_resource *res, @@ -871,16 +875,26 @@ void dlm_lockres_release_ast(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res); -int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data); -int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data); -int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data); -int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data); -int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data); -int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data); -int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data); -int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data); -int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data); -int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data); +int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 nodenum, u8 *real_master); diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c index 59fb63da8b65..ecb4d997221e 100644 --- a/fs/ocfs2/dlm/dlmconvert.c +++ b/fs/ocfs2/dlm/dlmconvert.c @@ -418,7 +418,8 @@ static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm, * returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS, * status from __dlmconvert_master */ -int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_ctxt *dlm = data; struct dlm_convert_lock *cnv = (struct dlm_convert_lock *)msg->buf; diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index 3995de360264..8a208b06fdd7 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -95,10 +95,14 @@ static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events); #define DLM_DOMAIN_BACKOFF_MS 200 -static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data); -static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data); -static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data); -static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data); +static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); @@ -466,7 +470,8 @@ static void __dlm_print_nodes(struct dlm_ctxt *dlm) printk("\n"); } -static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data) +static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_ctxt *dlm = data; unsigned int node; @@ -630,7 +635,8 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm) } EXPORT_SYMBOL_GPL(dlm_unregister_domain); -static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data) +static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_query_join_request *query; enum dlm_query_join_response response; @@ -707,7 +713,8 @@ respond: return response; } -static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data) +static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_assert_joined *assert; struct dlm_ctxt *dlm = NULL; @@ -744,7 +751,8 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data) return 0; } -static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data) +static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_cancel_join *cancel; struct dlm_ctxt *dlm = NULL; @@ -1086,105 +1094,105 @@ static int dlm_register_domain_handlers(struct dlm_ctxt *dlm) status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key, sizeof(struct dlm_master_request), dlm_master_request_handler, - dlm, &dlm->dlm_domain_handlers); + dlm, NULL, &dlm->dlm_domain_handlers); if (status) goto bail; status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key, sizeof(struct dlm_assert_master), dlm_assert_master_handler, - dlm, &dlm->dlm_domain_handlers); + dlm, NULL, &dlm->dlm_domain_handlers); if (status) goto bail; status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key, sizeof(struct dlm_create_lock), dlm_create_lock_handler, - dlm, &dlm->dlm_domain_handlers); + dlm, NULL, &dlm->dlm_domain_handlers); if (status) goto bail; status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key, DLM_CONVERT_LOCK_MAX_LEN, dlm_convert_lock_handler, - dlm, &dlm->dlm_domain_handlers); + dlm, NULL, &dlm->dlm_domain_handlers); if (status) goto bail; status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key, DLM_UNLOCK_LOCK_MAX_LEN, dlm_unlock_lock_handler, - dlm, &dlm->dlm_domain_handlers); + dlm, NULL, &dlm->dlm_domain_handlers); if (status) goto bail; status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key, DLM_PROXY_AST_MAX_LEN, dlm_proxy_ast_handler, - dlm, &dlm->dlm_domain_handlers); + dlm, NULL, &dlm->dlm_domain_handlers); if (status) goto bail; status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key, sizeof(struct dlm_exit_domain), dlm_exit_domain_handler, - dlm, &dlm->dlm_domain_handlers); + dlm, NULL, &dlm->dlm_domain_handlers); if (status) goto bail; status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key, sizeof(struct dlm_deref_lockres), dlm_deref_lockres_handler, - dlm, &dlm->dlm_domain_handlers); + dlm, NULL, &dlm->dlm_domain_handlers); if (status) goto bail; status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key, sizeof(struct dlm_migrate_request), dlm_migrate_request_handler, - dlm, &dlm->dlm_domain_handlers); + dlm, NULL, &dlm->dlm_domain_handlers); if (status) goto bail; status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key, DLM_MIG_LOCKRES_MAX_LEN, dlm_mig_lockres_handler, - dlm, &dlm->dlm_domain_handlers); + dlm, NULL, &dlm->dlm_domain_handlers); if (status) goto bail; status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key, sizeof(struct dlm_master_requery), dlm_master_requery_handler, - dlm, &dlm->dlm_domain_handlers); + dlm, NULL, &dlm->dlm_domain_handlers); if (status) goto bail; status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key, sizeof(struct dlm_lock_request), dlm_request_all_locks_handler, - dlm, &dlm->dlm_domain_handlers); + dlm, NULL, &dlm->dlm_domain_handlers); if (status) goto bail; status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key, sizeof(struct dlm_reco_data_done), dlm_reco_data_done_handler, - dlm, &dlm->dlm_domain_handlers); + dlm, NULL, &dlm->dlm_domain_handlers); if (status) goto bail; status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key, sizeof(struct dlm_begin_reco), dlm_begin_reco_handler, - dlm, &dlm->dlm_domain_handlers); + dlm, NULL, &dlm->dlm_domain_handlers); if (status) goto bail; status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key, sizeof(struct dlm_finalize_reco), dlm_finalize_reco_handler, - dlm, &dlm->dlm_domain_handlers); + dlm, NULL, &dlm->dlm_domain_handlers); if (status) goto bail; @@ -1478,21 +1486,21 @@ static int dlm_register_net_handlers(void) status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, sizeof(struct dlm_query_join_request), dlm_query_join_handler, - NULL, &dlm_join_handlers); + NULL, NULL, &dlm_join_handlers); if (status) goto bail; status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, sizeof(struct dlm_assert_joined), dlm_assert_joined_handler, - NULL, &dlm_join_handlers); + NULL, NULL, &dlm_join_handlers); if (status) goto bail; status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, sizeof(struct dlm_cancel_join), dlm_cancel_join_handler, - NULL, &dlm_join_handlers); + NULL, NULL, &dlm_join_handlers); bail: if (status < 0) diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c index ac91a76b1e78..52578d907d9a 100644 --- a/fs/ocfs2/dlm/dlmlock.c +++ b/fs/ocfs2/dlm/dlmlock.c @@ -441,7 +441,8 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, * held on exit: none * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED */ -int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_ctxt *dlm = data; struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf; diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 6cfbdf282d46..bd1268778b66 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -1469,7 +1469,8 @@ out: * * if possible, TRIM THIS DOWN!!! */ -int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { u8 response = DLM_MASTER_RESP_MAYBE; struct dlm_ctxt *dlm = data; @@ -1800,7 +1801,8 @@ again: * * if possible, TRIM THIS DOWN!!! */ -int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_ctxt *dlm = data; struct dlm_master_list_entry *mle = NULL; @@ -2265,7 +2267,8 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) return ret; } -int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_ctxt *dlm = data; struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf; @@ -2948,7 +2951,8 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm, * we will have no mle in the list to start with. now we can add an mle for * the migration and this should be the only one found for those scanning the * list. */ -int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_ctxt *dlm = data; struct dlm_lock_resource *res = NULL; diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 38d714645309..6d4a83d50152 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -818,7 +818,8 @@ static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from, } -int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_ctxt *dlm = data; struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf; @@ -975,7 +976,8 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to) } -int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_ctxt *dlm = data; struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf; @@ -1331,7 +1333,8 @@ error: * do we spin? returning an error only delays the problem really */ -int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_ctxt *dlm = data; struct dlm_migratable_lockres *mres = @@ -1624,7 +1627,8 @@ int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, /* this function cannot error, so unless the sending * or receiving of the message failed, the owner can * be trusted */ -int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_ctxt *dlm = data; struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf; @@ -2600,7 +2604,8 @@ retry: return ret; } -int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_ctxt *dlm = data; struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf; @@ -2728,7 +2733,8 @@ stage2: return ret; } -int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_ctxt *dlm = data; struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf; diff --git a/fs/ocfs2/dlm/dlmunlock.c b/fs/ocfs2/dlm/dlmunlock.c index fc8baa3e9539..86ca085ef324 100644 --- a/fs/ocfs2/dlm/dlmunlock.c +++ b/fs/ocfs2/dlm/dlmunlock.c @@ -383,7 +383,8 @@ static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm, * returns: DLM_NORMAL, DLM_BADARGS, DLM_IVLOCKID, * return value from dlmunlock_master */ -int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_ctxt *dlm = data; struct dlm_unlock_lock *unlock = (struct dlm_unlock_lock *)msg->buf; diff --git a/fs/ocfs2/vote.c b/fs/ocfs2/vote.c index 0afd8b9af70f..f30e63b9910c 100644 --- a/fs/ocfs2/vote.c +++ b/fs/ocfs2/vote.c @@ -887,7 +887,7 @@ static inline int ocfs2_translate_response(int response) static int ocfs2_handle_response_message(struct o2net_msg *msg, u32 len, - void *data) + void *data, void **ret_data) { unsigned int response_id, node_num; int response_status; @@ -943,7 +943,7 @@ bail: static int ocfs2_handle_vote_message(struct o2net_msg *msg, u32 len, - void *data) + void *data, void **ret_data) { int status; struct ocfs2_super *osb = data; @@ -1007,7 +1007,7 @@ int ocfs2_register_net_handlers(struct ocfs2_super *osb) osb->net_key, sizeof(struct ocfs2_response_msg), ocfs2_handle_response_message, - osb, &osb->osb_net_handlers); + osb, NULL, &osb->osb_net_handlers); if (status) { mlog_errno(status); goto bail; @@ -1017,7 +1017,7 @@ int ocfs2_register_net_handlers(struct ocfs2_super *osb) osb->net_key, sizeof(struct ocfs2_vote_msg), ocfs2_handle_vote_message, - osb, &osb->osb_net_handlers); + osb, NULL, &osb->osb_net_handlers); if (status) { mlog_errno(status); goto bail; -- cgit v1.2.1 From 3b8118cffad224415c6f6f35abe7ca2a1d79c05a Mon Sep 17 00:00:00 2001 From: Kurt Hackel Date: Wed, 17 Jan 2007 17:05:53 -0800 Subject: ocfs2_dlm: Calling post handler function in assert master handler This patch prevents the dlm from sending the clear refmap message before the set refmap. We use the newly created post function handler routine to accomplish the task. Signed-off-by: Kurt Hackel Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh --- fs/ocfs2/dlm/dlmcommon.h | 2 ++ fs/ocfs2/dlm/dlmdomain.c | 3 ++- fs/ocfs2/dlm/dlmmaster.c | 24 +++++++++++++++++++++--- fs/ocfs2/dlm/dlmthread.c | 4 ++++ 4 files changed, 29 insertions(+), 4 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 2df6fde3e652..3f554711efe5 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -224,6 +224,7 @@ static inline void __dlm_set_joining_node(struct dlm_ctxt *dlm, #define DLM_LOCK_RES_MIGRATING 0x00000020 #define DLM_LOCK_RES_DROPPING_REF 0x00000040 #define DLM_LOCK_RES_BLOCK_DIRTY 0x00001000 +#define DLM_LOCK_RES_SETREF_INPROG 0x00002000 /* max milliseconds to wait to sync up a network failure with a node death */ #define DLM_NODE_DEATH_WAIT_MAX (5 * 1000) @@ -879,6 +880,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data, void **ret_data); int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data, void **ret_data); +void dlm_assert_master_post_handler(int status, void *data, void *ret_data); int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data, void **ret_data); int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data, diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index 8a208b06fdd7..6590e1bca23c 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -1101,7 +1101,8 @@ static int dlm_register_domain_handlers(struct dlm_ctxt *dlm) status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key, sizeof(struct dlm_assert_master), dlm_assert_master_handler, - dlm, NULL, &dlm->dlm_domain_handlers); + dlm, dlm_assert_master_post_handler, + &dlm->dlm_domain_handlers); if (status) goto bail; diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index bd1268778b66..84f36db8ada3 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -2036,8 +2036,12 @@ ok: done: ret = 0; - if (res) - dlm_lockres_put(res); + if (res) { + spin_lock(&res->spinlock); + res->state |= DLM_LOCK_RES_SETREF_INPROG; + spin_unlock(&res->spinlock); + *ret_data = (void *)res; + } dlm_put(dlm); if (master_request) { mlog(0, "need to tell master to reassert\n"); @@ -2064,11 +2068,25 @@ kill: __dlm_print_one_lock_resource(res); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); - dlm_lockres_put(res); + *ret_data = (void *)res; dlm_put(dlm); return -EINVAL; } +void dlm_assert_master_post_handler(int status, void *data, void *ret_data) +{ + struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data; + + if (ret_data) { + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_SETREF_INPROG; + spin_unlock(&res->spinlock); + wake_up(&res->wq); + dlm_lockres_put(res); + } + return; +} + int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, int ignore_higher, u8 request_from, u32 flags) diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index 3b94e4dec351..8ffa0916eb86 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -176,6 +176,10 @@ static int dlm_purge_lockres(struct dlm_ctxt *dlm, res->lockname.name, master); if (!master) { + spin_lock(&res->spinlock); + /* This ensures that clear refmap is sent after the set */ + __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); + spin_unlock(&res->spinlock); /* drop spinlock to do messaging, retake below */ spin_unlock(&dlm->spinlock); /* clear our bit from the master's refmap, ignore errors */ -- cgit v1.2.1 From ab81afd30bc154bb1e8749e5aeeffe9b93c90834 Mon Sep 17 00:00:00 2001 From: Sunil Mushran Date: Mon, 29 Jan 2007 14:57:14 -0800 Subject: ocfs2: Binds listener to the configured ip address This patch binds the o2net listener to the configured ip address instead of INADDR_ANY for security. Fixes oss.oracle.com bugzilla#814. Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh --- fs/ocfs2/cluster/tcp.c | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c index 7700418d25ec..2021aec7cbbd 100644 --- a/fs/ocfs2/cluster/tcp.c +++ b/fs/ocfs2/cluster/tcp.c @@ -1805,13 +1805,13 @@ out: ready(sk, bytes); } -static int o2net_open_listening_sock(__be16 port) +static int o2net_open_listening_sock(__be32 addr, __be16 port) { struct socket *sock = NULL; int ret; struct sockaddr_in sin = { .sin_family = PF_INET, - .sin_addr = { .s_addr = (__force u32)htonl(INADDR_ANY) }, + .sin_addr = { .s_addr = (__force u32)addr }, .sin_port = (__force u16)port, }; @@ -1834,15 +1834,15 @@ static int o2net_open_listening_sock(__be16 port) sock->sk->sk_reuse = 1; ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin)); if (ret < 0) { - mlog(ML_ERROR, "unable to bind socket to port %d, ret=%d\n", - ntohs(port), ret); + mlog(ML_ERROR, "unable to bind socket at %u.%u.%u.%u:%u, " + "ret=%d\n", NIPQUAD(addr), ntohs(port), ret); goto out; } ret = sock->ops->listen(sock, 64); if (ret < 0) { - mlog(ML_ERROR, "unable to listen on port %d, ret=%d\n", - ntohs(port), ret); + mlog(ML_ERROR, "unable to listen on %u.%u.%u.%u:%u, ret=%d\n", + NIPQUAD(addr), ntohs(port), ret); } out: @@ -1875,7 +1875,8 @@ int o2net_start_listening(struct o2nm_node *node) return -ENOMEM; /* ? */ } - ret = o2net_open_listening_sock(node->nd_ipv4_port); + ret = o2net_open_listening_sock(node->nd_ipv4_address, + node->nd_ipv4_port); if (ret) { destroy_workqueue(o2net_wq); o2net_wq = NULL; -- cgit v1.2.1 From f3f854648de64c4b6f13f6f13113bc9525c621e5 Mon Sep 17 00:00:00 2001 From: Sunil Mushran Date: Mon, 29 Jan 2007 15:19:16 -0800 Subject: ocfs2_dlm: Ensure correct ordering of set/clear refmap bit on lockres Eventhough the set refmap bit message is sent before the clear refmap message, currently there is no guarentee that the set message will be handled before the clear. This patch prevents the clear refmap to be processed while the node is sending assert master messages to other nodes. (The set refmap message is sent as a response to the assert master request). Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh --- fs/ocfs2/dlm/dlmcommon.h | 6 +++ fs/ocfs2/dlm/dlmmaster.c | 99 ++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 94 insertions(+), 11 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 3f554711efe5..2f4f5d4edb07 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -180,6 +180,11 @@ struct dlm_assert_master_priv unsigned ignore_higher:1; }; +struct dlm_deref_lockres_priv +{ + struct dlm_lock_resource *deref_res; + u8 deref_node; +}; struct dlm_work_item { @@ -191,6 +196,7 @@ struct dlm_work_item struct dlm_request_all_locks_priv ral; struct dlm_mig_lockres_priv ml; struct dlm_assert_master_priv am; + struct dlm_deref_lockres_priv dl; } u; }; diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 84f36db8ada3..77e4e6169a0d 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -102,6 +102,7 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data); static int dlm_do_assert_master(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, void *nodemap, u32 flags); +static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data); static inline int dlm_mle_equal(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle, @@ -1717,6 +1718,11 @@ int dlm_do_assert_master(struct dlm_ctxt *dlm, unsigned int namelen = res->lockname.len; BUG_ON(namelen > O2NM_MAX_NAME_LEN); + + spin_lock(&res->spinlock); + res->state |= DLM_LOCK_RES_SETREF_INPROG; + spin_unlock(&res->spinlock); + again: reassert = 0; @@ -1789,6 +1795,11 @@ again: if (reassert) goto again; + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_SETREF_INPROG; + spin_unlock(&res->spinlock); + wake_up(&res->wq); + return ret; } @@ -2296,6 +2307,9 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data, int ret = -EINVAL; u8 node; unsigned int hash; + struct dlm_work_item *item; + int cleared = 0; + int dispatch = 0; if (!dlm_grab(dlm)) return 0; @@ -2326,27 +2340,90 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data, spin_unlock(&dlm->spinlock); spin_lock(&res->spinlock); - BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); - if (test_bit(node, res->refmap)) { - ret = 0; - dlm_lockres_clear_refmap_bit(node, res); - } else { - mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref " - "but it is already dropped!\n", dlm->name, namelen, - name, node); - __dlm_print_one_lock_resource(res); + if (res->state & DLM_LOCK_RES_SETREF_INPROG) + dispatch = 1; + else { + BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); + if (test_bit(node, res->refmap)) { + dlm_lockres_clear_refmap_bit(node, res); + cleared = 1; + } } spin_unlock(&res->spinlock); - if (!ret) - dlm_lockres_calc_usage(dlm, res); + if (!dispatch) { + if (cleared) + dlm_lockres_calc_usage(dlm, res); + else { + mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref " + "but it is already dropped!\n", dlm->name, + res->lockname.len, res->lockname.name, node); + __dlm_print_one_lock_resource(res); + } + ret = 0; + goto done; + } + + item = kzalloc(sizeof(*item), GFP_NOFS); + if (!item) { + ret = -ENOMEM; + mlog_errno(ret); + goto done; + } + + dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL); + item->u.dl.deref_res = res; + item->u.dl.deref_node = node; + + spin_lock(&dlm->work_lock); + list_add_tail(&item->list, &dlm->work_list); + spin_unlock(&dlm->work_lock); + + queue_work(dlm->dlm_worker, &dlm->dispatched_work); + return 0; + done: if (res) dlm_lockres_put(res); dlm_put(dlm); + return ret; } +static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data) +{ + struct dlm_ctxt *dlm; + struct dlm_lock_resource *res; + u8 node; + u8 cleared = 0; + + dlm = item->dlm; + res = item->u.dl.deref_res; + node = item->u.dl.deref_node; + + spin_lock(&res->spinlock); + BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); + if (test_bit(node, res->refmap)) { + __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); + dlm_lockres_clear_refmap_bit(node, res); + cleared = 1; + } + spin_unlock(&res->spinlock); + + if (cleared) { + mlog(0, "%s:%.*s node %u ref dropped in dispatch\n", + dlm->name, res->lockname.len, res->lockname.name, node); + dlm_lockres_calc_usage(dlm, res); + } else { + mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref " + "but it is already dropped!\n", dlm->name, + res->lockname.len, res->lockname.name, node); + __dlm_print_one_lock_resource(res); + } + + dlm_lockres_put(res); +} + /* * DLM_MIGRATE_LOCKRES -- cgit v1.2.1 From 1faf289454b9eeb6e463da3eee47f7009668370d Mon Sep 17 00:00:00 2001 From: Srinivas Eeda Date: Mon, 29 Jan 2007 15:31:35 -0800 Subject: ocfs2_dlm: disallow a domain join if node maps mismatch There is a small window where a joining node may not see the node(s) that just died but are still part of the domain. To fix this, we must disallow join requests if the joining node has a different node map. A new field node_map is added to dlm_query_join_request to send the current nodes nodemap along with join request. On the receiving end the nodes that are part of the cluster verifies if this new node sees all the nodes that are still part of the cluster. They disallow the join if the maps mismatch. Signed-off-by: Srinivas Eeda Signed-off-by: Mark Fasheh --- fs/ocfs2/cluster/tcp_internal.h | 5 +++- fs/ocfs2/dlm/dlmcommon.h | 4 +++ fs/ocfs2/dlm/dlmdomain.c | 54 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 1 deletion(-) (limited to 'fs') diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h index d74040fac343..177927a8f007 100644 --- a/fs/ocfs2/cluster/tcp_internal.h +++ b/fs/ocfs2/cluster/tcp_internal.h @@ -38,6 +38,9 @@ * locking semantics of the file system using the protocol. It should * be somewhere else, I'm sure, but right now it isn't. * + * New in version 7: + * - DLM join domain includes the live nodemap + * * New in version 6: * - DLM lockres remote refcount fixes. * @@ -54,7 +57,7 @@ * - full 64 bit i_size in the metadata lock lvbs * - introduction of "rw" lock and pushing meta/data locking down */ -#define O2NET_PROTOCOL_VERSION 6ULL +#define O2NET_PROTOCOL_VERSION 7ULL struct o2net_handshake { __be64 protocol_version; __be64 connector_id; diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 2f4f5d4edb07..e90b92f9ece1 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -625,12 +625,16 @@ struct dlm_begin_reco }; +#define BITS_PER_BYTE 8 +#define BITS_TO_BYTES(bits) (((bits)+BITS_PER_BYTE-1)/BITS_PER_BYTE) + struct dlm_query_join_request { u8 node_idx; u8 pad1[2]; u8 name_len; u8 domain[O2NM_MAX_NAME_LEN]; + u8 node_map[BITS_TO_BYTES(O2NM_MAX_NODES)]; }; struct dlm_assert_joined diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index 6590e1bca23c..19b57a6bcb1a 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -48,6 +48,36 @@ #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN) #include "cluster/masklog.h" +/* + * ocfs2 node maps are array of long int, which limits to send them freely + * across the wire due to endianness issues. To workaround this, we convert + * long ints to byte arrays. Following 3 routines are helper functions to + * set/test/copy bits within those array of bytes + */ +static inline void byte_set_bit(u8 nr, u8 map[]) +{ + map[nr >> 3] |= (1UL << (nr & 7)); +} + +static inline int byte_test_bit(u8 nr, u8 map[]) +{ + return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0; +} + +static inline void byte_copymap(u8 dmap[], unsigned long smap[], + unsigned int sz) +{ + unsigned int nn; + + if (!sz) + return; + + memset(dmap, 0, ((sz + 7) >> 3)); + for (nn = 0 ; nn < sz; nn++) + if (test_bit(nn, smap)) + byte_set_bit(nn, dmap); +} + static void dlm_free_pagevec(void **vec, int pages) { while (pages--) @@ -641,6 +671,7 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, struct dlm_query_join_request *query; enum dlm_query_join_response response; struct dlm_ctxt *dlm = NULL; + u8 nodenum; query = (struct dlm_query_join_request *) msg->buf; @@ -664,6 +695,25 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, spin_lock(&dlm_domain_lock); dlm = __dlm_lookup_domain_full(query->domain, query->name_len); + if (!dlm) + goto unlock_respond; + + /* + * There is a small window where the joining node may not see the + * node(s) that just left but still part of the cluster. DISALLOW + * join request if joining node has different node map. + */ + nodenum=0; + while (nodenum < O2NM_MAX_NODES) { + if (test_bit(nodenum, dlm->domain_map)) { + if (!byte_test_bit(nodenum, query->node_map)) { + response = JOIN_DISALLOW; + goto unlock_respond; + } + } + nodenum++; + } + /* Once the dlm ctxt is marked as leaving then we don't want * to be put in someone's domain map. * Also, explicitly disallow joining at certain troublesome @@ -705,6 +755,7 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, spin_unlock(&dlm->spinlock); } +unlock_respond: spin_unlock(&dlm_domain_lock); respond: @@ -854,6 +905,9 @@ static int dlm_request_join(struct dlm_ctxt *dlm, join_msg.name_len = strlen(dlm->name); memcpy(join_msg.domain, dlm->name, join_msg.name_len); + /* copy live node map to join message */ + byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES); + status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg, sizeof(join_msg), node, &retval); if (status < 0 && status != -ENOPROTOOPT) { -- cgit v1.2.1 From e4968476a9bc5a6b30076076b4f3ce3e692e0d79 Mon Sep 17 00:00:00 2001 From: Sunil Mushran Date: Mon, 29 Jan 2007 15:37:02 -0800 Subject: ocfs2_dlm: Silence some messages during join domain These messages can easily be activated using the mlog infrastructure and don't need to be enabled by default. Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh --- fs/ocfs2/dlm/dlmdomain.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index 19b57a6bcb1a..e8ecf8c3dbe7 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -707,6 +707,9 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, while (nodenum < O2NM_MAX_NODES) { if (test_bit(nodenum, dlm->domain_map)) { if (!byte_test_bit(nodenum, query->node_map)) { + mlog(0, "disallow join as node %u does not " + "have node %u in its nodemap\n", + query->node_idx, nodenum); response = JOIN_DISALLOW; goto unlock_respond; } @@ -732,15 +735,15 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, /* Disallow parallel joins. */ response = JOIN_DISALLOW; } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) { - mlog(ML_NOTICE, "node %u trying to join, but recovery " + mlog(0, "node %u trying to join, but recovery " "is ongoing.\n", bit); response = JOIN_DISALLOW; } else if (test_bit(bit, dlm->recovery_map)) { - mlog(ML_NOTICE, "node %u trying to join, but it " + mlog(0, "node %u trying to join, but it " "still needs recovery.\n", bit); response = JOIN_DISALLOW; } else if (test_bit(bit, dlm->domain_map)) { - mlog(ML_NOTICE, "node %u trying to join, but it " + mlog(0, "node %u trying to join, but it " "is still in the domain! needs recovery?\n", bit); response = JOIN_DISALLOW; -- cgit v1.2.1 From 0dd82141b236ce36253e3056c6068ee3d5732196 Mon Sep 17 00:00:00 2001 From: Sunil Mushran Date: Mon, 29 Jan 2007 15:44:27 -0800 Subject: ocfs2_dlm: Add timeout to dlm join domain Currently the ocfs2 dlm has no timeout during dlm join domain. While this is not a problem in normal operation, this does become an issue if, say, the other node is refusing to let the node join the domain because of a stuck recovery. This patch adds a 90 sec timeout. Signed-off-by: Sunil Mushran Signed-off-by: Mark Fasheh --- fs/ocfs2/dlm/dlmdomain.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) (limited to 'fs') diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index e8ecf8c3dbe7..6087c4749fee 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -1264,6 +1264,8 @@ bail: static int dlm_join_domain(struct dlm_ctxt *dlm) { int status; + unsigned int backoff; + unsigned int total_backoff = 0; BUG_ON(!dlm); @@ -1295,18 +1297,27 @@ static int dlm_join_domain(struct dlm_ctxt *dlm) } do { - unsigned int backoff; status = dlm_try_to_join_domain(dlm); /* If we're racing another node to the join, then we * need to back off temporarily and let them * complete. */ +#define DLM_JOIN_TIMEOUT_MSECS 90000 if (status == -EAGAIN) { if (signal_pending(current)) { status = -ERESTARTSYS; goto bail; } + if (total_backoff > + msecs_to_jiffies(DLM_JOIN_TIMEOUT_MSECS)) { + status = -ERESTARTSYS; + mlog(ML_NOTICE, "Timed out joining dlm domain " + "%s after %u msecs\n", dlm->name, + jiffies_to_msecs(total_backoff)); + goto bail; + } + /* * After you! * No, after you! @@ -1316,6 +1327,7 @@ static int dlm_join_domain(struct dlm_ctxt *dlm) */ backoff = (unsigned int)(jiffies & 0x3); backoff *= DLM_DOMAIN_BACKOFF_MS; + total_backoff += backoff; mlog(0, "backoff %d\n", backoff); msleep(backoff); } -- cgit v1.2.1 From f71aa8a55a0ae1a0d06c6079265d16502a678e8e Mon Sep 17 00:00:00 2001 From: Randy Dunlap Date: Thu, 25 Jan 2007 14:51:50 -0800 Subject: [PATCH] ocfs2: drop INET from Kconfig, not needed OCFS2: drop 'depends on INET' since local mounts are now allowed. Signed-off-by: Randy Dunlap Signed-off-by: Mark Fasheh --- fs/Kconfig | 1 - 1 file changed, 1 deletion(-) (limited to 'fs') diff --git a/fs/Kconfig b/fs/Kconfig index 8cd2417a14db..5e8e9d9ccb33 100644 --- a/fs/Kconfig +++ b/fs/Kconfig @@ -426,7 +426,6 @@ config OCFS2_FS select CONFIGFS_FS select JBD select CRC32 - select INET help OCFS2 is a general purpose extent based shared disk cluster file system with many similarities to ext3. It supports 64 bit inode -- cgit v1.2.1 From 925037bcba7691db2403684141a276930ad184f3 Mon Sep 17 00:00:00 2001 From: Zhen Wei Date: Tue, 23 Jan 2007 17:19:59 -0800 Subject: ocfs2: introduce sc->sc_send_lock to protect outbound outbound messages When there is a lot of multithreaded I/O usage, two threads can collide while sending out a message to the other nodes. This is due to the lack of locking between threads while sending out the messages. When a connected TCP send(), sendto(), or sendmsg() arrives in the Linux kernel, it eventually comes through tcp_sendmsg(). tcp_sendmsg() protects itself by acquiring a lock at invocation by calling lock_sock(). tcp_sendmsg() then loops over the buffers in the iovec, allocating associated sk_buff's and cache pages for use in the actual send. As it does so, it pushes the data out to tcp for actual transmission. However, if one of those allocation fails (because a large number of large sends is being processed, for example), it must wait for memory to become available. It does so by jumping to wait_for_sndbuf or wait_for_memory, both of which eventually cause a call to sk_stream_wait_memory(). sk_stream_wait_memory() contains a code path that calls sk_wait_event(). Finally, sk_wait_event() contains the call to release_sock(). The following patch adds a lock to the socket container in order to properly serialize outbound requests. From: Zhen Wei Acked-by: Jeff Mahoney Signed-off-by: Mark Fasheh --- fs/ocfs2/cluster/tcp.c | 8 ++++++++ fs/ocfs2/cluster/tcp_internal.h | 2 ++ 2 files changed, 10 insertions(+) (limited to 'fs') diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c index 2021aec7cbbd..1718215fc018 100644 --- a/fs/ocfs2/cluster/tcp.c +++ b/fs/ocfs2/cluster/tcp.c @@ -556,6 +556,8 @@ static void o2net_register_callbacks(struct sock *sk, sk->sk_data_ready = o2net_data_ready; sk->sk_state_change = o2net_state_change; + mutex_init(&sc->sc_send_lock); + write_unlock_bh(&sk->sk_callback_lock); } @@ -858,10 +860,12 @@ static void o2net_sendpage(struct o2net_sock_container *sc, ssize_t ret; + mutex_lock(&sc->sc_send_lock); ret = sc->sc_sock->ops->sendpage(sc->sc_sock, virt_to_page(kmalloced_virt), (long)kmalloced_virt & ~PAGE_MASK, size, MSG_DONTWAIT); + mutex_unlock(&sc->sc_send_lock); if (ret != size) { mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT " failed with %zd\n", size, SC_NODEF_ARGS(sc), ret); @@ -976,8 +980,10 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec, /* finally, convert the message header to network byte-order * and send */ + mutex_lock(&sc->sc_send_lock); ret = o2net_send_tcp_msg(sc->sc_sock, vec, veclen, sizeof(struct o2net_msg) + caller_bytes); + mutex_unlock(&sc->sc_send_lock); msglog(msg, "sending returned %d\n", ret); if (ret < 0) { mlog(0, "error returned from o2net_send_tcp_msg=%d\n", ret); @@ -1109,8 +1115,10 @@ static int o2net_process_message(struct o2net_sock_container *sc, out_respond: /* this destroys the hdr, so don't use it after this */ + mutex_lock(&sc->sc_send_lock); ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr, handler_status); + mutex_unlock(&sc->sc_send_lock); hdr = NULL; mlog(0, "sending handler status %d, syserr %d returned %d\n", handler_status, syserr, ret); diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h index 177927a8f007..4dae5df5e467 100644 --- a/fs/ocfs2/cluster/tcp_internal.h +++ b/fs/ocfs2/cluster/tcp_internal.h @@ -155,6 +155,8 @@ struct o2net_sock_container { struct timeval sc_tv_func_stop; u32 sc_msg_key; u16 sc_msg_type; + + struct mutex sc_send_lock; }; struct o2net_msg_handler { -- cgit v1.2.1 From b559292e066f6d570cd5aa5dbd41de61dd04bdce Mon Sep 17 00:00:00 2001 From: Philipp Reisner Date: Thu, 11 Jan 2007 10:58:10 +0100 Subject: [PATCH] ocfs2 heartbeat: clean up bio submission code As was already pointed out Mathieu Avila on Thu, 07 Sep 2006 03:15:25 -0700 that OCFS2 is expecting bio_add_page() to add pages to BIOs in an easily predictable manner. That is not true, especially for devices with own merge_bvec_fn(). Therefore OCFS2's heartbeat code is very likely to fail on such devices. Move the bio_put() call into the bio's bi_end_io() function. This makes the whole idea of trying to predict the behaviour of bio_add_page() unnecessary. Removed compute_max_sectors() and o2hb_compute_request_limits(). Signed-off-by: Philipp Reisner Signed-off-by: Mark Fasheh --- fs/ocfs2/cluster/heartbeat.c | 158 +++++++++---------------------------------- 1 file changed, 31 insertions(+), 127 deletions(-) (limited to 'fs') diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c index 277ca67a2ad6..5a9779bb9236 100644 --- a/fs/ocfs2/cluster/heartbeat.c +++ b/fs/ocfs2/cluster/heartbeat.c @@ -184,10 +184,9 @@ static void o2hb_disarm_write_timeout(struct o2hb_region *reg) flush_scheduled_work(); } -static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc, - unsigned int num_ios) +static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc) { - atomic_set(&wc->wc_num_reqs, num_ios); + atomic_set(&wc->wc_num_reqs, 1); init_completion(&wc->wc_io_complete); wc->wc_error = 0; } @@ -212,6 +211,7 @@ static void o2hb_wait_on_io(struct o2hb_region *reg, struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping; blk_run_address_space(mapping); + o2hb_bio_wait_dec(wc, 1); wait_for_completion(&wc->wc_io_complete); } @@ -231,6 +231,7 @@ static int o2hb_bio_end_io(struct bio *bio, return 1; o2hb_bio_wait_dec(wc, 1); + bio_put(bio); return 0; } @@ -238,23 +239,22 @@ static int o2hb_bio_end_io(struct bio *bio, * start_slot. */ static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, struct o2hb_bio_wait_ctxt *wc, - unsigned int start_slot, - unsigned int num_slots) + unsigned int *current_slot, + unsigned int max_slots) { - int i, nr_vecs, len, first_page, last_page; + int len, current_page; unsigned int vec_len, vec_start; unsigned int bits = reg->hr_block_bits; unsigned int spp = reg->hr_slots_per_page; + unsigned int cs = *current_slot; struct bio *bio; struct page *page; - nr_vecs = (num_slots + spp - 1) / spp; - /* Testing has shown this allocation to take long enough under * GFP_KERNEL that the local node can get fenced. It would be * nicest if we could pre-allocate these bios and avoid this * all together. */ - bio = bio_alloc(GFP_ATOMIC, nr_vecs); + bio = bio_alloc(GFP_ATOMIC, 16); if (!bio) { mlog(ML_ERROR, "Could not alloc slots BIO!\n"); bio = ERR_PTR(-ENOMEM); @@ -262,137 +262,53 @@ static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, } /* Must put everything in 512 byte sectors for the bio... */ - bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9); + bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9); bio->bi_bdev = reg->hr_bdev; bio->bi_private = wc; bio->bi_end_io = o2hb_bio_end_io; - first_page = start_slot / spp; - last_page = first_page + nr_vecs; - vec_start = (start_slot << bits) % PAGE_CACHE_SIZE; - for(i = first_page; i < last_page; i++) { - page = reg->hr_slot_data[i]; + vec_start = (cs << bits) % PAGE_CACHE_SIZE; + while(cs < max_slots) { + current_page = cs / spp; + page = reg->hr_slot_data[current_page]; - vec_len = PAGE_CACHE_SIZE; - /* last page might be short */ - if (((i + 1) * spp) > (start_slot + num_slots)) - vec_len = ((num_slots + start_slot) % spp) << bits; - vec_len -= vec_start; + vec_len = min(PAGE_CACHE_SIZE, + (max_slots-cs) * (PAGE_CACHE_SIZE/spp) ); mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n", - i, vec_len, vec_start); + current_page, vec_len, vec_start); len = bio_add_page(bio, page, vec_len, vec_start); - if (len != vec_len) { - bio_put(bio); - bio = ERR_PTR(-EIO); - - mlog(ML_ERROR, "Error adding page to bio i = %d, " - "vec_len = %u, len = %d\n, start = %u\n", - i, vec_len, len, vec_start); - goto bail; - } + if (len != vec_len) break; + cs += vec_len / (PAGE_CACHE_SIZE/spp); vec_start = 0; } bail: + *current_slot = cs; return bio; } -/* - * Compute the maximum number of sectors the bdev can handle in one bio, - * as a power of two. - * - * Stolen from oracleasm, thanks Joel! - */ -static int compute_max_sectors(struct block_device *bdev) -{ - int max_pages, max_sectors, pow_two_sectors; - - struct request_queue *q; - - q = bdev_get_queue(bdev); - max_pages = q->max_sectors >> (PAGE_SHIFT - 9); - if (max_pages > BIO_MAX_PAGES) - max_pages = BIO_MAX_PAGES; - if (max_pages > q->max_phys_segments) - max_pages = q->max_phys_segments; - if (max_pages > q->max_hw_segments) - max_pages = q->max_hw_segments; - max_pages--; /* Handle I/Os that straddle a page */ - - if (max_pages) { - max_sectors = max_pages << (PAGE_SHIFT - 9); - } else { - /* If BIO contains 1 or less than 1 page. */ - max_sectors = q->max_sectors; - } - /* Why is fls() 1-based???? */ - pow_two_sectors = 1 << (fls(max_sectors) - 1); - - return pow_two_sectors; -} - -static inline void o2hb_compute_request_limits(struct o2hb_region *reg, - unsigned int num_slots, - unsigned int *num_bios, - unsigned int *slots_per_bio) -{ - unsigned int max_sectors, io_sectors; - - max_sectors = compute_max_sectors(reg->hr_bdev); - - io_sectors = num_slots << (reg->hr_block_bits - 9); - - *num_bios = (io_sectors + max_sectors - 1) / max_sectors; - *slots_per_bio = max_sectors >> (reg->hr_block_bits - 9); - - mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This " - "device can handle %u sectors of I/O\n", io_sectors, num_slots, - max_sectors); - mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n", - *num_bios, *slots_per_bio); -} - static int o2hb_read_slots(struct o2hb_region *reg, unsigned int max_slots) { - unsigned int num_bios, slots_per_bio, start_slot, num_slots; - int i, status; + unsigned int current_slot=0; + int status; struct o2hb_bio_wait_ctxt wc; - struct bio **bios; struct bio *bio; - o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio); + o2hb_bio_wait_init(&wc); - bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL); - if (!bios) { - status = -ENOMEM; - mlog_errno(status); - return status; - } - - o2hb_bio_wait_init(&wc, num_bios); - - num_slots = slots_per_bio; - for(i = 0; i < num_bios; i++) { - start_slot = i * slots_per_bio; - - /* adjust num_slots at last bio */ - if (max_slots < (start_slot + num_slots)) - num_slots = max_slots - start_slot; - - bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots); + while(current_slot < max_slots) { + bio = o2hb_setup_one_bio(reg, &wc, ¤t_slot, max_slots); if (IS_ERR(bio)) { - o2hb_bio_wait_dec(&wc, num_bios - i); - status = PTR_ERR(bio); mlog_errno(status); goto bail_and_wait; } - bios[i] = bio; + atomic_inc(&wc.wc_num_reqs); submit_bio(READ, bio); } @@ -403,38 +319,30 @@ bail_and_wait: if (wc.wc_error && !status) status = wc.wc_error; - if (bios) { - for(i = 0; i < num_bios; i++) - if (bios[i]) - bio_put(bios[i]); - kfree(bios); - } - return status; } static int o2hb_issue_node_write(struct o2hb_region *reg, - struct bio **write_bio, struct o2hb_bio_wait_ctxt *write_wc) { int status; unsigned int slot; struct bio *bio; - o2hb_bio_wait_init(write_wc, 1); + o2hb_bio_wait_init(write_wc); slot = o2nm_this_node(); - bio = o2hb_setup_one_bio(reg, write_wc, slot, 1); + bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1); if (IS_ERR(bio)) { status = PTR_ERR(bio); mlog_errno(status); goto bail; } + atomic_inc(&write_wc->wc_num_reqs); submit_bio(WRITE, bio); - *write_bio = bio; status = 0; bail: return status; @@ -826,7 +734,6 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) { int i, ret, highest_node, change = 0; unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; - struct bio *write_bio; struct o2hb_bio_wait_ctxt write_wc; ret = o2nm_configured_node_map(configured_nodes, @@ -864,7 +771,7 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) /* And fire off the write. Note that we don't wait on this I/O * until later. */ - ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); + ret = o2hb_issue_node_write(reg, &write_wc); if (ret < 0) { mlog_errno(ret); return ret; @@ -882,7 +789,6 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) * people we find in our steady state have seen us. */ o2hb_wait_on_io(reg, &write_wc); - bio_put(write_bio); if (write_wc.wc_error) { /* Do not re-arm the write timeout on I/O error - we * can't be sure that the new block ever made it to @@ -943,7 +849,6 @@ static int o2hb_thread(void *data) { int i, ret; struct o2hb_region *reg = data; - struct bio *write_bio; struct o2hb_bio_wait_ctxt write_wc; struct timeval before_hb, after_hb; unsigned int elapsed_msec; @@ -993,10 +898,9 @@ static int o2hb_thread(void *data) * * XXX: Should we skip this on unclean_stop? */ o2hb_prepare_block(reg, 0); - ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); + ret = o2hb_issue_node_write(reg, &write_wc); if (ret == 0) { o2hb_wait_on_io(reg, &write_wc); - bio_put(write_bio); } else { mlog_errno(ret); } -- cgit v1.2.1 From ff05d1c4643dd4260eb699396043d7e8009c0de4 Mon Sep 17 00:00:00 2001 From: Joel Becker Date: Tue, 23 Jan 2007 17:00:45 -0800 Subject: configfs: Zero terminate data in configfs attribute writes. Attributes in configfs are text files. As such, most handlers expect to be able to call functions like simple_strtoul() without checking the bounds of the buffer. Change the call to zero terminate the buffer before calling the client's ->store() method. This does reduce the attribute size from PAGE_SIZE to PAGE_SIZE-1. Also, change get_zeroed_page() to alloc_page(), as we are handling the termination. Signed-off-by: Joel Becker Signed-off-by: Mark Fasheh --- fs/configfs/file.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'fs') diff --git a/fs/configfs/file.c b/fs/configfs/file.c index 2a7cb086e80c..d98be5e01328 100644 --- a/fs/configfs/file.c +++ b/fs/configfs/file.c @@ -162,14 +162,17 @@ fill_write_buffer(struct configfs_buffer * buffer, const char __user * buf, size int error; if (!buffer->page) - buffer->page = (char *)get_zeroed_page(GFP_KERNEL); + buffer->page = (char *)__get_free_pages(GFP_KERNEL, 0); if (!buffer->page) return -ENOMEM; - if (count > PAGE_SIZE) - count = PAGE_SIZE; + if (count >= PAGE_SIZE) + count = PAGE_SIZE - 1; error = copy_from_user(buffer->page,buf,count); buffer->needs_read_fill = 1; + /* if buf is assumed to contain a string, terminate it by \0, + * so e.g. sscanf() can scan the string easily */ + buffer->page[count] = 0; return error ? -EFAULT : count; } -- cgit v1.2.1