1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.zk;
19
20 import org.apache.curator.RetryPolicy;
21 import org.apache.curator.framework.CuratorFramework;
22 import org.apache.curator.framework.CuratorFrameworkFactory;
23 import org.apache.curator.retry.ExponentialBackoffRetry;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 import java.io.IOException;
28 import java.util.concurrent.TimeUnit;
29
30 public class ZKUtils {
31
32 private static final Logger LOG = LoggerFactory.getLogger(ZKUtils.class);
33
34 public static CuratorFramework initZKClient(String zkCluster, String namespace, int zkConnectionTimeoutInSec)
35 throws IOException {
36
37 LOG.info("Creating Zookeeper Client connecting to {}", zkCluster);
38
39 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
40 CuratorFramework zkClient = CuratorFrameworkFactory.builder()
41 .namespace(namespace)
42 .connectString(zkCluster)
43 .retryPolicy(retryPolicy)
44 .build();
45
46 zkClient.start();
47
48 try {
49 if (zkClient.blockUntilConnected(zkConnectionTimeoutInSec, TimeUnit.SECONDS)) {
50 LOG.info("Connected to ZK cluster '{}', client in state: [{}]", zkCluster, zkClient.getState());
51 } else {
52 String errorMsg = String.format("Can't contact ZK cluster '%s' after %d seconds",
53 zkCluster, zkConnectionTimeoutInSec);
54 throw new IOException(errorMsg);
55 }
56 } catch (InterruptedException ex) {
57 throw new IOException(String.format("Interrupted whilst connecting to ZK cluster '%s'", zkCluster));
58 }
59
60 return zkClient;
61
62 }
63
64 }