hive thrift demo001

import org.apache.hadoop.hive.common.auth.HiveAuthUtils; import org.apache.hive.jdbc.Utils.JdbcConnectionParams; import org.apache.hive.service.auth.PlainSaslHelper; import org.apache.hive.service.rpc.thrift.*; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.security.auth.Subject; import javax.security.sasl.SaslException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.sql.SQLException; import java.util.*; import java.util.concurrent.locks.ReentrantLock; /** * HiveConnection. * */ public class HiveXConn { public static final Logger LOG = LoggerFactory.getLogger(HiveXConn.class.getName()); private String jdbcUriString; private String host; private int port; private JdbcConnectionParams connParams; private TTransport transport; // TODO should be replaced by CliServiceClient private TCLIService.Iface client; private TSessionHandle sessHandle = null; private int loginTimeout = 0; private TProtocolVersion protocol; private int fetchSize = 1000; public HiveXConn(String uri, Properties info) throws Exception {// JDBC URL: jdbc:hive2://:【hive thrift demo001】/dbName; sess_var_list?hive_conf_list#hive_var_list // each list: =; = and so on // sess_var_list -> sessConfMap // hive_conf_list -> hiveConfMap // hive_var_list -> hiveVarMap host = "192.168.11.9"; port = 10000; // open the client transport openTransport(); // set up the client client = new TCLIService.Client(new TBinaryProtocol(transport)); // open client session openSession(); client = newSynchronizedClient(client); }public static TCLIService.Iface newSynchronizedClient( TCLIService.Iface client) { return (TCLIService.Iface) Proxy.newProxyInstance( org.apache.hive.jdbc.HiveConnection.class.getClassLoader(), new Class [] { TCLIService.Iface.class }, new SynchronizedHandler(client)); }private void openTransport() throws Exception {transport =createBinaryTransport(); if (!transport.isOpen()) { transport.open(); } }//https://github.com/ds112/hbase-on-windows/blob/77e5f31715f3b4a258f212b242cd698ad983af60/Samples/Java/Hive/ThriftAPI/src/main/java/Client.javaprivate TTransport createBinaryTransport() throws SaslException { // we are using PLAIN Sasl connection with user/password String userName = "hive"; String passwd = "hive"; TTransportsocketTransport = HiveAuthUtils.getSocketTransport(host, port, loginTimeout); transport = PlainSaslHelper.getPlainTransport(userName, passwd, socketTransport); return transport; }private void openSession() throws SQLException { TOpenSessionReq openReq = new TOpenSessionReq(); Map openConf = new HashMap(); openConf.put("use:database", "default"); // set the fetchSize openConf.put("set:hiveconf:hive.server2.thrift.resultset.default.fetch.size", Integer.toString(fetchSize)); openReq.setConfiguration(openConf); try { TOpenSessionResp openResp = client.OpenSession(openReq); protocol = openResp.getServerProtocolVersion(); sessHandle = openResp.getSessionHandle(); TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, "SELECT * FROM test limit 10"); //TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, "show tables"); TExecuteStatementResp execResp = client.ExecuteStatement(execReq); TOperationHandle stmtHandle = execResp.getOperationHandle(); TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, TFetchOrientation.FETCH_FIRST, 100); TFetchResultsResp resultsResp = client.FetchResults(fetchReq); TRowSet resultsSet = resultsResp.getResults(); List tableResult = new ArrayList(); if (resultsSet.getColumns() != null && resultsSet.getColumns().size() != 0) { List resultColumns = resultsSet.getColumns(); for(int i=0; i

    推荐阅读