ZookeeperNet太难用,写了个RetryHelper来进行配套使用

普通的zk用法,如下写法:

zk.Exists("/aaa", true);
zk.Create(...);

 

但是由于这些API会抛Zookeeper的Exception,比如ConnectionLossException, NoNodeException等,所以必须配合一堆try/catch的机制来catch错误,catch后再处理...

写起来很麻烦

 

因此写了个RetryHelper来封装上面这个try/catch行为,用起来也比较方便,如下:

RetryHelper helper=RetryHelper.Make();

helper.CreateNodeStructure = () => { Console.WriteLine("CreateNodeStructure"); };
helper.FixConnectionLossAction = () => { Console.WriteLine("FixConnectionLossAction");};
helper.IfErrorThen = () => { Console.WriteLine("IfErrorThen"); };
helper.Execute(() =>
{
     this.zk.GetChildren(...);
});

 

上面的意思是如果在Execute中,如果报错了,则会看报错的是哪种类型,如果是ConnectionLoss则执行FixConnectionLossAction委托,如果是NoNode则执行建立节点的委托

也就是将最常见的2个zookeeper动作给结构化了:建立节点目录结构以及连接丢失时的重新连接动作 

 

RetryHelper代码:

public class RetryHelper
    {
        private int retryDelay = 500;
        private long signal = 0;
        public Action IfErrorThen;
        public Action CreateNodeStructure;
        public Action FixConnectionLossAction;

        public static RetryHelper Make()
        {
            return new RetryHelper();
        }

        public void Execute(Action action)
        {
            while (true)
            {
                try
                {
                    action();
                    break;
                }
                catch (ZooKeeperNet.KeeperException.NoNodeException ex)
                {
                    //create node structure

                    Console.WriteLine("retry helper NoNodeException: " + ex.Message);

                    if (CreateNodeStructure != null)
                        RetryHelper.Make().Execute(CreateNodeStructure);
                    continue;
                }
                catch (ZooKeeperNet.KeeperException.ConnectionLossException ex)
                {
                    Console.WriteLine("retry helper ConnectionLossException: " + ex.Message);

                    long attempSignal = Interlocked.Read(ref signal);

                    while (Interlocked.Read(ref signal) > 0)
                        Thread.Sleep(retryDelay);

                    if (attempSignal == 0)
                    {
                        Interlocked.Increment(ref signal);

                        if (FixConnectionLossAction != null)
                            RetryHelper.Make().Execute(FixConnectionLossAction);

                        Interlocked.Decrement(ref signal);
                    }

                    continue;
                }
                catch (Exception ex)
                {
                    Console.WriteLine("retry helper catch: " + ex.Message);
                    Thread.Sleep(retryDelay);

                    if (IfErrorThen != null)
                        IfErrorThen();
                    continue;
                }
            }
        }
    }

 

仔细看上面代码的朋友肯定也注意到里面catch connectionloss exception的代码块中使用了Interlocked,这是因为:在多线程系统侠,如果zk连接丢失了,由于多个地方都在尝试zk操作,所以会导致并发性的进入catch loss connection exception代码处理块,如果此时不加判断的处理所有并发请求,则会出现连接多次到zk,严重影响性能;因此,这里的代码实际上意图是将多次连接请求合并为一次连接。此处特别感谢我同事的code review,哈哈。

下面是个测试并发消除的demo,为了让结果清晰,我把RetryHelper的catch中的Console.WriteLine注释了

static void Main(string[] args)
        {
            RetryHelper helper=RetryHelper.Make();

            helper.CreateNodeStructure = () => { Console.WriteLine("CreateNodeStructure"); };
            helper.FixConnectionLossAction = () => 
            { 
                Console.WriteLine(Thread.CurrentThread.ManagedThreadId+" FixConnectionLossAction BEGIN "+DateTime.Now.ToString());
                Thread.Sleep(2000);
                Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " FixConnectionLossAction END " + DateTime.Now.ToString());
            };
            helper.IfErrorThen = () => { Console.WriteLine("IfErrorThen"); };

            var tasks=new List<Task>();
            for (int i = 0; i < 10; i++)
            {
                var task = new Task(() => {
                    helper.Execute(() =>
                    {
                        throw new ZooKeeperNet.KeeperException.ConnectionLossException();
                    });
                });
                tasks.Add(task);
            }

            tasks.ForEach(t=>t.Start());

            Task.WaitAll(tasks.ToArray());

            Console.ReadKey();
        }

 

运行:

 

 

code download

 

 

 

posted @ 2014-07-08 23:11 McKay 阅读(...) 评论(...) 编辑 收藏