- 首页 > it技术 > >
import sqlparse
from collections import namedtuple"""
contextManager
"""def split(s: str):
"""
it is used to split SQL string to tuple.
:param s: sql String
:return: tuple
"""
res = list(filter(lambda x: x != "" and x != ";
", s.replace("\n", "").split(" ")))
if len(res) == 0:
return tuple()
else:
res[-1] = res[-1].rstrip(";
")
return tuple(map(lambda x: x.lower(), res))def sqltuple2json(t: tuple):
from_tag = "from"
table_name = t[t.index(from_tag)+1] if from_tag in t else ""print(table_name)SqlType = {
"select": "select",
"create": "create",
"drop": "drop",
"alter": "alter",
"insert": "insert",
"explain": "explain",
"update": "update",
"delete": "delete"
}def get_sql_type(t: tuple):
return SqlType[t[0]]def get_collection(t: tuple):
sql_type = get_sql_type(t)
if sql_type is "select":
from_tag = "from"
return t[t.index(from_tag) + 1] if from_tag in t else ""class Parse:__slots__ = ()@staticmethod
def parse(t: tuple):
sql_type = get_sql_type(t)
if sql_type is "select":
table_name = t[t.index("from") + 1] if "from" in t else ""
if not table_name:
raise SQLMongoException("collection is empty")
condition = t[t.index("where") + 1:] if "where" in t else ""
columns = Parse._get_columns(t[t.index("select") + 1:t.index("from")])
if not condition:
if columns == ("*",):
return {}
else:
res = list(map(lambda x: (x, 1), columns))
if "id" in columns:
res.append(("_id", 0))
return dict(res)@staticmethod
def _get_columns(t: tuple)->tuple:
return tuple(map(lambda x: x.strip(","), filter(lambda x: x != ",", t)))class SQLMongoException(Exception):
passif __name__ == '__main__':
r = split(b)
s = sqlparse.format(b)
print("---",s)
parsed = Parse.parse(r)
print(parsed)
推荐阅读