#|HBase应用程序开发00

hadoop,hbase,hive版本整合兼容性最全,最详细说明【适用于任何版本】
http://www.aboutyun.com/blog-61-2938.html
HBase创建表:

package com.chb.hbT; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; public class HBaseTestCase { //声明静态配置HBaeConfiguration static Configurationcfg = HBaseConfiguration.create(); /** * 创建一个表 * create 'tableName', 'columnFamily' */ public static void createTable(String tableName, String cf) { HBaseAdmin admin = null; try { admin = new HBaseAdmin(cfg); if(admin.tableExists(tableName)) { //表名存在 System.out.print("Table Exists!"); }else {//表不存在, 创建表 HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); hTableDescriptor.addFamily(new HColumnDescriptor(cf)); admin.createTable(hTableDescriptor); System.out.println("Create table success!"); } } catch (Exception e) { e.printStackTrace(); }finally{ if(admin != null) { try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } } } }





创建Configuartion
我们先看一下HBaseConfiguartion , 使用HBaseConfiguration的静态方法创建一个创建一个Configuartion用来加载Hbase的配置文件,
public static Configuration create() { Configuration conf = new Configuration(); conf.setClassLoader(HBaseConfiguration.class.getClassLoader()); return addHbaseResources(conf); }

加载配置文件
加载配置文件是在addHbaseResource(config)中, 我们到方法中看一看:发现加载了hbase-default.xmlhbase-site.xml, 后者会覆盖前者的配置,
public static Configuration addHbaseResources(Configuration conf) { //加载配置资源 conf.addResource("hbase-default.xml"); conf.addResource("hbase-site.xml"); //检查版本,对hbase.defaults.fro.version进行对比 checkDefaultsVersion(conf); //检查 checkForClusterFreeMemoryLimit(conf); return conf; }

检查版本
checkDefaultVersion(conf); 是检查Hbase的版本, 在hbase-site.xml中的hbase-defaults.for.version.skip:一般设置为True, 跳过版本检查,即不对hbase.defaults.for.version进行检查。
private static void checkDefaultsVersion(Configuration conf) { if (conf.getBoolean("hbase.defaults.for.version.skip", Boolean.FALSE.booleanValue())) return; String defaultsVersion = conf.get("hbase.defaults.for.version"); String thisVersion = VersionInfo.getVersion(); if (!thisVersion.equals(defaultsVersion)) { throw new RuntimeException("hbase-default.xml file seems to be for and old version of HBase (" + defaultsVersion + "), this version is " + thisVersion); } }

检查FreeMemory
参考:http://blog.csdn.net/huoyunshen88/article/details/9169077
hbase.regionserver.global.memstore.upperLimit/lowerLimit
默认值:0.4/0.35
upperlimit说明:hbase.hregion.memstore.flush.size 这个参数的作用是当单个Region内所有的memstore大小总和超过指定值时,flush该region的所有memstore。RegionServer的flush是通过将请求添加一个队列,模拟生产消费模式来异步处理的。那这里就有一个问题,当队列来不及消费,产生大量积压请求时,可能会导致内存陡增,最坏的情况是触发OOM。
这个参数的作用是防止内存占用过大,当ReigonServer内所有region的memstores所占用内存总和达到heap的40%时,HBase会强制block所有的更新并flush这些region以释放所有memstore占用的内存。
lowerLimit说明: 同upperLimit,只不过lowerLimit在所有region的memstores所占用内存达到Heap的35%时,不flush所有的memstore。它会找一个memstore内存占用最大的region,做个别flush,此时写更新还是会被block。lowerLimit算是一个在所有region强制flush导致性能降低前的补救措施。在日志中,表现为 “** Flush thread woke up with memory above low water.”
调优:这是一个Heap内存保护参数,默认值已经能适用大多数场景。
参数调整会影响读写,如果写的压力大导致经常超过这个阀值,则调小读缓存hfile.block.cache.size增大该阀值,或者Heap余量较多时,不修改读缓存大小。
如果在高压情况下,也没超过这个阀值,那么建议你适当调小这个阀值再做压测,确保触发次数不要太多,然后还有较多Heap余量的时候,调大hfile.block.cache.size提高读性能。
还有一种可能性是?hbase.hregion.memstore.flush.size保持不变,但RS维护了过多的region,要知道 region数量直接影响占用内存的大小。
hfile.block.cache.size
默认值:0.2
说明:storefile的读缓存占用Heap的大小百分比,0.2表示20%。该值直接影响数据读的性能。
调优:当然是越大越好,如果写比读少很多,开到0.4-0.5也没问题。如果读写较均衡,0.3左右。如果写比读多,果断默认吧。设置这个值的时候,你同时要参考?hbase.regionserver.global.memstore.upperLimit?,该值是memstore占heap的最大百分比,两个参数一个影响读,一个影响写。如果两值加起来超过80-90%,会有OOM的风险,谨慎设置。
private static void checkForClusterFreeMemoryLimit(Configuration conf) { float globalMemstoreLimit = conf.getFloat("hbase.regionserver.global.memstore.upperLimit", 0.4F); int gml = (int)(globalMemstoreLimit * 100.0F); float blockCacheUpperLimit = conf.getFloat("hfile.block.cache.size", 0.25F); int bcul = (int)(blockCacheUpperLimit * 100.0F); if (100 - (gml + bcul) < 20) {throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds the threshold required for successful cluster operation. The combined value cannot exceed 0.8. Please check the settings for hbase.regionserver.global.memstore.upperLimit and hfile.block.cache.size in your configuration. hbase.regionserver.global.memstore.upperLimit is " + globalMemstoreLimit + " hfile.block.cache.size is " + blockCacheUpperLimit); } }

【#|HBase应用程序开发00】HBaseConfiguartion源码:
package com.chb.hbT; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ZKConfig; public class HBaseConfiguration extends Configuration { private static final Log LOG = LogFactory.getLog(HBaseConfiguration.class); private static final int CONVERT_TO_PERCENTAGE = 100; @Deprecated public HBaseConfiguration() { addHbaseResources(this); LOG.warn("instantiating HBaseConfiguration() is deprecated. Please use HBaseConfiguration#create() to construct a plain Configuration"); } @Deprecated public HBaseConfiguration(Configuration c) { this(); merge(this, c); } private static void checkDefaultsVersion(Configuration conf) { if (conf.getBoolean("hbase.defaults.for.version.skip", Boolean.FALSE.booleanValue())) return; String defaultsVersion = conf.get("hbase.defaults.for.version"); String thisVersion = VersionInfo.getVersion(); if (!thisVersion.equals(defaultsVersion)) { throw new RuntimeException("hbase-default.xml file seems to be for and old version of HBase (" + defaultsVersion + "), this version is " + thisVersion); } } private static void checkForClusterFreeMemoryLimit(Configuration conf) { float globalMemstoreLimit = conf.getFloat("hbase.regionserver.global.memstore.upperLimit", 0.4F); int gml = (int)(globalMemstoreLimit * 100.0F); float blockCacheUpperLimit = conf.getFloat("hfile.block.cache.size", 0.25F); int bcul = (int)(blockCacheUpperLimit * 100.0F); if (100 - (gml + bcul) < 20) {throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds the threshold required for successful cluster operation. The combined value cannot exceed 0.8. Please check the settings for hbase.regionserver.global.memstore.upperLimit and hfile.block.cache.size in your configuration. hbase.regionserver.global.memstore.upperLimit is " + globalMemstoreLimit + " hfile.block.cache.size is " + blockCacheUpperLimit); } }public static Configuration addHbaseResources(Configuration conf) { conf.addResource("hbase-default.xml"); conf.addResource("hbase-site.xml"); checkDefaultsVersion(conf); checkForClusterFreeMemoryLimit(conf); return conf; }//创建配置, 使用create()从上名, HBaseConfig的构造函数都depar public static Configuration create() { Configuration conf = new Configuration(); conf.setClassLoader(HBaseConfiguration.class.getClassLoader()); return addHbaseResources(conf); } public static Configuration create(Configuration that) { Configuration conf = create(); merge(conf, that); return conf; }public static void merge(Configuration destConf, Configuration srcConf) { for (Map.Entry e : srcConf) { destConf.set((String)e.getKey(), (String)e.getValue()); } }public static Configuration subset(Configuration srcConf, String prefix) { Configuration newConf = new Configuration(false); for (Map.Entry entry : srcConf) { if (((String)entry.getKey()).startsWith(prefix)) { String newKey = ((String)entry.getKey()).substring(prefix.length()); if (!newKey.isEmpty()) { newConf.set(newKey, (String)entry.getValue()); } } } return newConf; }public static void setWithPrefix(Configuration conf, String prefix, Iterable properties) { for (Map.Entry entry : properties) { conf.set(prefix + (String)entry.getKey(), (String)entry.getValue()); } }public static boolean isShowConfInServlet() { boolean isShowConf = false; try { if (Class.forName("org.apache.hadoop.conf.ConfServlet") != null) { isShowConf = true; } } catch (LinkageError e) { LOG.warn("Error thrown: ", e); } catch (ClassNotFoundException ce) { LOG.debug("ClassNotFound: ConfServlet"); }return isShowConf; }public static int getInt(Configuration conf, String name, String deprecatedName, int defaultValue) { if (conf.get(deprecatedName) != null) { LOG.warn(String.format("Config option \"%s\" is deprecated. Instead, use \"%s\"", new Object[] { deprecatedName, name })); return conf.getInt(deprecatedName, defaultValue); } return conf.getInt(name, defaultValue); } public static String getPassword(Configuration conf, String alias, String defPass) throws IOException { String passwd = null; try { Method m = Configuration.class.getMethod("getPassword", new Class[] { String.class }); char[] p = (char[])m.invoke(conf, new Object[] { alias }); if (p != null) { LOG.debug(String.format("Config option \"%s\" was found through the Configuration getPassword method.", new Object[] { alias })); passwd = new String(p); } else { LOG.debug(String.format("Config option \"%s\" was not found. Using provided default value", new Object[] { alias })); passwd = defPass; } } catch (NoSuchMethodException e) { LOG.debug(String.format("Credential.getPassword method is not available. Falling back to configuration.", new Object[0])); passwd = conf.get(alias, defPass); } catch (SecurityException e) { throw new IOException(e.getMessage(), e); } catch (IllegalAccessException e) { throw new IOException(e.getMessage(), e); } catch (IllegalArgumentException e) { throw new IOException(e.getMessage(), e); } catch (InvocationTargetException e) { throw new IOException(e.getMessage(), e); } return passwd; }public static Configuration createClusterConf(Configuration baseConf, String clusterKey) throws IOException { return createClusterConf(baseConf, clusterKey, null); } public static Configuration createClusterConf(Configuration baseConf, String clusterKey, String overridePrefix) throws IOException { Configuration clusterConf = create(baseConf); if ((clusterKey != null) && (!clusterKey.isEmpty())) { applyClusterKeyToConf(clusterConf, clusterKey); }if ((overridePrefix != null) && (!overridePrefix.isEmpty())) { Configuration clusterSubset = subset(clusterConf, overridePrefix); merge(clusterConf, clusterSubset); } return clusterConf; }private static void applyClusterKeyToConf(Configuration conf, String key) throws IOException { ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key); conf.set("hbase.zookeeper.quorum", zkClusterKey.getQuorumString()); conf.setInt("hbase.zookeeper.property.clientPort", zkClusterKey.getClientPort()); conf.set("zookeeper.znode.parent", zkClusterKey.getZnodeParent()); } public static void main(String[] args) throws Exception { create().writeXml(System.out); } }

    推荐阅读