mongosql

import sqlparse from collections import namedtuple""" contextManager """def split(s: str): """ it is used to split SQL string to tuple. :param s: sql String :return: tuple """ res = list(filter(lambda x: x != "" and x != "; ", s.replace("\n", "").split(" "))) if len(res) == 0: return tuple() else: res[-1] = res[-1].rstrip("; ") return tuple(map(lambda x: x.lower(), res))def sqltuple2json(t: tuple): from_tag = "from" table_name = t[t.index(from_tag)+1] if from_tag in t else ""print(table_name)SqlType = { "select": "select", "create": "create", "drop": "drop", "alter": "alter", "insert": "insert", "explain": "explain", "update": "update", "delete": "delete" }def get_sql_type(t: tuple): return SqlType[t[0]]def get_collection(t: tuple): sql_type = get_sql_type(t) if sql_type is "select": from_tag = "from" return t[t.index(from_tag) + 1] if from_tag in t else ""class Parse:__slots__ = ()@staticmethod def parse(t: tuple): sql_type = get_sql_type(t) if sql_type is "select": table_name = t[t.index("from") + 1] if "from" in t else "" if not table_name: raise SQLMongoException("collection is empty") condition = t[t.index("where") + 1:] if "where" in t else "" columns = Parse._get_columns(t[t.index("select") + 1:t.index("from")]) if not condition: if columns == ("*",): return {} else: res = list(map(lambda x: (x, 1), columns)) if "id" in columns: res.append(("_id", 0)) return dict(res)@staticmethod def _get_columns(t: tuple)->tuple: return tuple(map(lambda x: x.strip(","), filter(lambda x: x != ",", t)))class SQLMongoException(Exception): passif __name__ == '__main__': r = split(b) s = sqlparse.format(b) print("---",s) parsed = Parse.parse(r) print(parsed)

    推荐阅读