diff options
Diffstat (limited to 'fs/ocfs2/cluster/heartbeat.c')
-rw-r--r-- | fs/ocfs2/cluster/heartbeat.c | 194 |
1 files changed, 183 insertions, 11 deletions
diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c index a8d15beee5cb..636abcbd4650 100644 --- a/fs/ocfs2/cluster/heartbeat.c +++ b/fs/ocfs2/cluster/heartbeat.c @@ -272,10 +272,21 @@ struct o2hb_region { struct delayed_work hr_write_timeout_work; unsigned long hr_last_timeout_start; + /* negotiate timer, used to negotiate extending hb timeout. */ + struct delayed_work hr_nego_timeout_work; + unsigned long hr_nego_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; + /* Used during o2hb_check_slot to hold a copy of the block * being checked because we temporarily have to zero out the * crc field. */ struct o2hb_disk_heartbeat_block *hr_tmp_block; + + /* Message key for negotiate timeout message. */ + unsigned int hr_key; + struct list_head hr_handler_list; + + /* last hb status, 0 for success, other value for error. */ + int hr_last_hb_status; }; struct o2hb_bio_wait_ctxt { @@ -284,6 +295,17 @@ struct o2hb_bio_wait_ctxt { int wc_error; }; +#define O2HB_NEGO_TIMEOUT_MS (O2HB_MAX_WRITE_TIMEOUT_MS/2) + +enum { + O2HB_NEGO_TIMEOUT_MSG = 1, + O2HB_NEGO_APPROVE_MSG = 2, +}; + +struct o2hb_nego_msg { + u8 node_num; +}; + static void o2hb_write_timeout(struct work_struct *work) { int failed, quorum; @@ -319,7 +341,7 @@ static void o2hb_write_timeout(struct work_struct *work) o2quo_disk_timeout(); } -static void o2hb_arm_write_timeout(struct o2hb_region *reg) +static void o2hb_arm_timeout(struct o2hb_region *reg) { /* Arm writeout only after thread reaches steady state */ if (atomic_read(®->hr_steady_iterations) != 0) @@ -334,14 +356,132 @@ static void o2hb_arm_write_timeout(struct o2hb_region *reg) spin_unlock(&o2hb_live_lock); } cancel_delayed_work(®->hr_write_timeout_work); - reg->hr_last_timeout_start = jiffies; schedule_delayed_work(®->hr_write_timeout_work, msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS)); + + cancel_delayed_work(®->hr_nego_timeout_work); + /* negotiate timeout must be less than write timeout. */ + schedule_delayed_work(®->hr_nego_timeout_work, + msecs_to_jiffies(O2HB_NEGO_TIMEOUT_MS)); + memset(reg->hr_nego_node_bitmap, 0, sizeof(reg->hr_nego_node_bitmap)); } -static void o2hb_disarm_write_timeout(struct o2hb_region *reg) +static void o2hb_disarm_timeout(struct o2hb_region *reg) { cancel_delayed_work_sync(®->hr_write_timeout_work); + cancel_delayed_work_sync(®->hr_nego_timeout_work); +} + +static int o2hb_send_nego_msg(int key, int type, u8 target) +{ + struct o2hb_nego_msg msg; + int status, ret; + + msg.node_num = o2nm_this_node(); +again: + ret = o2net_send_message(type, key, &msg, sizeof(msg), + target, &status); + + if (ret == -EAGAIN || ret == -ENOMEM) { + msleep(100); + goto again; + } + + return ret; +} + +static void o2hb_nego_timeout(struct work_struct *work) +{ + unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; + int master_node, i, ret; + struct o2hb_region *reg; + + reg = container_of(work, struct o2hb_region, hr_nego_timeout_work.work); + /* don't negotiate timeout if last hb failed since it is very + * possible io failed. Should let write timeout fence self. + */ + if (reg->hr_last_hb_status) + return; + + o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap)); + /* lowest node as master node to make negotiate decision. */ + master_node = find_next_bit(live_node_bitmap, O2NM_MAX_NODES, 0); + + if (master_node == o2nm_this_node()) { + if (!test_bit(master_node, reg->hr_nego_node_bitmap)) { + printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s).\n", + o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, + config_item_name(®->hr_item), reg->hr_dev_name); + set_bit(master_node, reg->hr_nego_node_bitmap); + } + if (memcmp(reg->hr_nego_node_bitmap, live_node_bitmap, + sizeof(reg->hr_nego_node_bitmap))) { + /* check negotiate bitmap every second to do timeout + * approve decision. + */ + schedule_delayed_work(®->hr_nego_timeout_work, + msecs_to_jiffies(1000)); + + return; + } + + printk(KERN_NOTICE "o2hb: all nodes hb write hung, maybe region %s (%s) is down.\n", + config_item_name(®->hr_item), reg->hr_dev_name); + /* approve negotiate timeout request. */ + o2hb_arm_timeout(reg); + + i = -1; + while ((i = find_next_bit(live_node_bitmap, + O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { + if (i == master_node) + continue; + + mlog(ML_HEARTBEAT, "send NEGO_APPROVE msg to node %d\n", i); + ret = o2hb_send_nego_msg(reg->hr_key, + O2HB_NEGO_APPROVE_MSG, i); + if (ret) + mlog(ML_ERROR, "send NEGO_APPROVE msg to node %d fail %d\n", + i, ret); + } + } else { + /* negotiate timeout with master node. */ + printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s), negotiate timeout with node %d.\n", + o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, config_item_name(®->hr_item), + reg->hr_dev_name, master_node); + ret = o2hb_send_nego_msg(reg->hr_key, O2HB_NEGO_TIMEOUT_MSG, + master_node); + if (ret) + mlog(ML_ERROR, "send NEGO_TIMEOUT msg to node %d fail %d\n", + master_node, ret); + } +} + +static int o2hb_nego_timeout_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) +{ + struct o2hb_region *reg = data; + struct o2hb_nego_msg *nego_msg; + + nego_msg = (struct o2hb_nego_msg *)msg->buf; + printk(KERN_NOTICE "o2hb: receive negotiate timeout message from node %d on region %s (%s).\n", + nego_msg->node_num, config_item_name(®->hr_item), reg->hr_dev_name); + if (nego_msg->node_num < O2NM_MAX_NODES) + set_bit(nego_msg->node_num, reg->hr_nego_node_bitmap); + else + mlog(ML_ERROR, "got nego timeout message from bad node.\n"); + + return 0; +} + +static int o2hb_nego_approve_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) +{ + struct o2hb_region *reg = data; + + printk(KERN_NOTICE "o2hb: negotiate timeout approved by master node on region %s (%s).\n", + config_item_name(®->hr_item), reg->hr_dev_name); + o2hb_arm_timeout(reg); + return 0; } static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc) @@ -390,7 +530,8 @@ static void o2hb_bio_end_io(struct bio *bio) static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, struct o2hb_bio_wait_ctxt *wc, unsigned int *current_slot, - unsigned int max_slots) + unsigned int max_slots, int op, + int op_flags) { int len, current_page; unsigned int vec_len, vec_start; @@ -416,6 +557,7 @@ static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, bio->bi_bdev = reg->hr_bdev; bio->bi_private = wc; bio->bi_end_io = o2hb_bio_end_io; + bio_set_op_attrs(bio, op, op_flags); vec_start = (cs << bits) % PAGE_SIZE; while(cs < max_slots) { @@ -451,7 +593,8 @@ static int o2hb_read_slots(struct o2hb_region *reg, o2hb_bio_wait_init(&wc); while(current_slot < max_slots) { - bio = o2hb_setup_one_bio(reg, &wc, ¤t_slot, max_slots); + bio = o2hb_setup_one_bio(reg, &wc, ¤t_slot, max_slots, + REQ_OP_READ, 0); if (IS_ERR(bio)) { status = PTR_ERR(bio); mlog_errno(status); @@ -459,7 +602,7 @@ static int o2hb_read_slots(struct o2hb_region *reg, } atomic_inc(&wc.wc_num_reqs); - submit_bio(READ, bio); + submit_bio(bio); } status = 0; @@ -483,7 +626,8 @@ static int o2hb_issue_node_write(struct o2hb_region *reg, slot = o2nm_this_node(); - bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1); + bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1, REQ_OP_WRITE, + WRITE_SYNC); if (IS_ERR(bio)) { status = PTR_ERR(bio); mlog_errno(status); @@ -491,7 +635,7 @@ static int o2hb_issue_node_write(struct o2hb_region *reg, } atomic_inc(&write_wc->wc_num_reqs); - submit_bio(WRITE_SYNC, bio); + submit_bio(bio); status = 0; bail: @@ -1032,7 +1176,8 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) /* Skip disarming the timeout if own slot has stale/bad data */ if (own_slot_ok) { o2hb_set_quorum_device(reg); - o2hb_arm_write_timeout(reg); + o2hb_arm_timeout(reg); + reg->hr_last_timeout_start = jiffies; } bail: @@ -1096,6 +1241,7 @@ static int o2hb_thread(void *data) before_hb = ktime_get_real(); ret = o2hb_do_disk_heartbeat(reg); + reg->hr_last_hb_status = ret; after_hb = ktime_get_real(); @@ -1114,7 +1260,7 @@ static int o2hb_thread(void *data) } } - o2hb_disarm_write_timeout(reg); + o2hb_disarm_timeout(reg); /* unclean stop is only used in very bad situation */ for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++) @@ -1451,6 +1597,7 @@ static void o2hb_region_release(struct config_item *item) list_del(®->hr_all_item); spin_unlock(&o2hb_live_lock); + o2net_unregister_handler_list(®->hr_handler_list); kfree(reg); } @@ -1762,6 +1909,7 @@ static ssize_t o2hb_region_dev_store(struct config_item *item, } INIT_DELAYED_WORK(®->hr_write_timeout_work, o2hb_write_timeout); + INIT_DELAYED_WORK(®->hr_nego_timeout_work, o2hb_nego_timeout); /* * A node is considered live after it has beat LIVE_THRESHOLD @@ -1995,13 +2143,37 @@ static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *g config_item_init_type_name(®->hr_item, name, &o2hb_region_type); + /* this is the same way to generate msg key as dlm, for local heartbeat, + * name is also the same, so make initial crc value different to avoid + * message key conflict. + */ + reg->hr_key = crc32_le(reg->hr_region_num + O2NM_MAX_REGIONS, + name, strlen(name)); + INIT_LIST_HEAD(®->hr_handler_list); + ret = o2net_register_handler(O2HB_NEGO_TIMEOUT_MSG, reg->hr_key, + sizeof(struct o2hb_nego_msg), + o2hb_nego_timeout_handler, + reg, NULL, ®->hr_handler_list); + if (ret) + goto free; + + ret = o2net_register_handler(O2HB_NEGO_APPROVE_MSG, reg->hr_key, + sizeof(struct o2hb_nego_msg), + o2hb_nego_approve_handler, + reg, NULL, ®->hr_handler_list); + if (ret) + goto unregister_handler; + ret = o2hb_debug_region_init(reg, o2hb_debug_dir); if (ret) { config_item_put(®->hr_item); - goto free; + goto unregister_handler; } return ®->hr_item; + +unregister_handler: + o2net_unregister_handler_list(®->hr_handler_list); free: kfree(reg); return ERR_PTR(ret); |