apisix 最详细源码分析以及手撸一个 apisix

启动

  • make run 调用 ./bin/apisix start
  • 寻找 juajit 路径 运行 /usr/local/Cellar/openresty/1.19.9.1_2/luajit/bin/luajit ./apisix/cli/apisix.lua start
  • 调用 ops.lua 里的 start 方法,初始化配置,ETCD,执行openresty 启动命令
    local function start(env, ...) init(env) init_etcd(env, args) util.execute_cmd(env.openresty_args) end

  • 初始化 nginx 配置,通过读取 conf/config.yaml 配合模板 ngx_tpl.lua 生成 nginx config 文件。供 openresty(nginx)使用
    local conf_render = template.compile(ngx_tpl) local ngxconf = conf_render(sys_conf)local ok, err = util.write_file(env.apisix_home .. "/conf/nginx.conf", ngxconf) if not ok then util.die("failed to update nginx.conf: ", err, "\n") end

  • 初始化 ETCD,读取ETCD集群配置,进行连接
    local version_url = host .. "/version" local errmsglocal res, err local retry_time = 0 while retry_time < 2 do res, err = request(version_url, yaml_conf) -- In case of failure, request returns nil followed by an error message. -- Else the first return value is the response body -- and followed by the response status code. if res then break end retry_time = retry_time + 1 print(str_format("Warning! Request etcd endpoint \'%s\' error, %s, retry time=%s", version_url, err, retry_time)) end

  • 执行openresty 启动命令 openresty -p /usr/local/apisix -c /conf/nginx.conf
    local openresty_args = [[openresty -p ]] .. apisix_home .. [[ -c ]] .. apisix_home .. [[/conf/nginx.conf]]util.execute_cmd(env.openresty_args)

插件流程 nginx 配置中嵌入的 apisix 流程
init_by_lua_block { require "resty.core" apisix = require("apisix")local dns_resolver = { "172.19.2.70", "172.19.2.62", } local args = { dns_resolver = dns_resolver, } apisix.http_init(args) }init_worker_by_lua_block { apisix.http_init_worker() }exit_worker_by_lua_block { apisix.http_exit_worker() }access_by_lua_block { apisix.http_access_phase() } header_filter_by_lua_block { apisix.http_header_filter_phase() } body_filter_by_lua_block { apisix.http_body_filter_phase() } log_by_lua_block { apisix.http_log_phase() }proxy_pass$upstream_scheme://apisix_backend$upstream_uri; upstream apisix_backend { server 0.0.0.1; balancer_by_lua_block { apisix.http_balancer_phase() }keepalive 320; keepalive_requests 1000; keepalive_timeout 60s; }

  • apisix.http_init
    1.设置 dns resolver
    2.设置实例id
    3.启动 privileged agent
    core.resolver.init_resolver(args) core.id.init() local process = require("ngx.process") local ok, err = process.enable_privileged_agent()

  • apisix.http_init_worker
    function _M.http_init_worker() local seed, err = core.utils.get_seed_from_urandom() if not seed then core.log.warn('failed to get seed from urandom: ', err) seed = ngx_now() * 1000 + ngx.worker.pid() end math.randomseed(seed) -- for testing only core.log.info("random test in [1, 10000]: ", math.random(1, 10000))local we = require("resty.worker.events") local ok, err = we.configure({shm = "worker-events", interval = 0.1}) if not ok then error("failed to init worker event: " .. err) end local discovery = require("apisix.discovery.init").discovery if discovery and discovery.init_worker then discovery.init_worker() end require("apisix.balancer").init_worker() load_balancer = require("apisix.balancer") require("apisix.admin.init").init_worker()require("apisix.timers").init_worker()plugin.init_worker() router.http_init_worker() require("apisix.http.service").init_worker() plugin_config.init_worker() require("apisix.consumer").init_worker()if core.config == require("apisix.core.config_yaml") then core.config.init_worker() endrequire("apisix.debug").init_worker() apisix_upstream.init_worker() require("apisix.plugins.ext-plugin.init").init_worker()local_conf = core.config.local_conf()if local_conf.apisix and local_conf.apisix.enable_server_tokens == false then ver_header = "APISIX" end end

1.初始化 openresty worker event
local we = require("resty.worker.events") local ok, err = we.configure({shm = "worker-events", interval = 0.1}) if not ok then error("failed to init worker event: " .. err) end

2.初始化服务发现
3.初始化balancer组件
4.初始化admin组件(会同步一次插件配置到 etcd)
sync_local_conf_to_etcd(true)

5.初始化timers组件
6.初始化 plugin 组件(清掉旧的 table, 然后把从config-default.yaml 和 config.yaml 文件中读取插件配置放到一个 local_plugins_hash中,并按优先级排序)
local local_conf_path = profile:yaml_path("config-default") local default_conf_yaml, err = util.read_file(local_conf_path) local_conf_path = profile:yaml_path("config") local user_conf_yaml, err = util.read_file(local_conf_path) ok, err = merge_conf(default_conf, user_conf) local local_plugins = core.table.new(32, 0)for name in pairs(local_plugins_hash) do unload_plugin(name) endcore.table.clear(local_plugins) core.table.clear(local_plugins_hash)for name in pairs(processed) do load_plugin(name, local_plugins) end-- sort by plugin's priority if #local_plugins > 1 then sort_tab(local_plugins, sort_plugin) endlocal plugin_metadatas, err = core.config.new("/plugin_metadata", {automatic = true} )

7.初始化router组件(初始化 etcd /global_rules 数据)
local global_rules, err = core.config.new("/global_rules", { automatic = true, item_schema = core.schema.global_rule, checker = plugin_checker, })

8.初始化 servers 组件(从ETCD抓取 services 配置)
services, err = core.config.new("/services", { automatic = true, item_schema = core.schema.service, checker = plugin_checker, filter = filter, })

9.初始化sonsumer组件
10.同步 config_yaml 到各个进程
-- sync data in each non-master process ngx.timer.every(1, read_apisix_yaml)

11.初始化upstream组件
12.初始化 ext-plugin 组件
  • apisix.http_exit_worker()
    停止 "privileged agent"
  • apisix.http_access_phase()
    function _M.http_access_phase() local ngx_ctx = ngx.ctxif not verify_tls_client(ngx_ctx.api_ctx) then return core.response.exit(400) end-- always fetch table from the table pool, we don't need a reused api_ctx local api_ctx = core.tablepool.fetch("api_ctx", 0, 32) ngx_ctx.api_ctx = api_ctxcore.ctx.set_vars_meta(api_ctx)local uri = api_ctx.var.uri if local_conf.apisix and local_conf.apisix.delete_uri_tail_slash then if str_byte(uri, #uri) == str_byte("/") then api_ctx.var.uri = str_sub(api_ctx.var.uri, 1, #uri - 1) core.log.info("remove the end of uri '/', current uri: ", api_ctx.var.uri) end endif router.api.has_route_not_under_apisix() or core.string.has_prefix(uri, "/apisix/") then local skip = local_conf and local_conf.apisix.global_rule_skip_internal_api local matched = router.api.match(api_ctx, skip) if matched then return end endrouter.router_http.match(api_ctx)local route = api_ctx.matched_route if not route then -- run global rule plugin.run_global_rules(api_ctx, router.global_rules, nil)core.log.info("not find any matched route") return core.response.exit(404, {error_msg = "404 Route Not Found"}) endcore.log.info("matched route: ", core.json.delay_encode(api_ctx.matched_route, true))local enable_websocket = route.value.enable_websocketif route.value.plugin_config_id then local conf = plugin_config.get(route.value.plugin_config_id) if not conf then core.log.error("failed to fetch plugin config by ", "id: ", route.value.plugin_config_id) return core.response.exit(503) endroute = plugin_config.merge(route, conf) endif route.value.service_id then local service = service_fetch(route.value.service_id) if not service then core.log.error("failed to fetch service configuration by ", "id: ", route.value.service_id) return core.response.exit(404) endroute = plugin.merge_service_route(service, route) api_ctx.matched_route = route api_ctx.conf_type = "route&service" api_ctx.conf_version = route.modifiedIndex .. "&" .. service.modifiedIndex api_ctx.conf_id = route.value.id .. "&" .. service.value.id api_ctx.service_id = service.value.id api_ctx.service_name = service.value.nameif enable_websocket == nil then enable_websocket = service.value.enable_websocket endelse api_ctx.conf_type = "route" api_ctx.conf_version = route.modifiedIndex api_ctx.conf_id = route.value.id end api_ctx.route_id = route.value.id api_ctx.route_name = route.value.name-- run global rule plugin.run_global_rules(api_ctx, router.global_rules, nil)if route.value.script then script.load(route, api_ctx) script.run("access", api_ctx)else local plugins = plugin.filter(route) api_ctx.plugins = pluginsplugin.run_plugin("rewrite", plugins, api_ctx) if api_ctx.consumer then local changed route, changed = plugin.merge_consumer_route( route, api_ctx.consumer, api_ctx )core.log.info("find consumer ", api_ctx.consumer.username, ", config changed: ", changed)if changed then api_ctx.matched_route = route core.table.clear(api_ctx.plugins) api_ctx.plugins = plugin.filter(route, api_ctx.plugins) end end plugin.run_plugin("access", plugins, api_ctx) endlocal up_id = route.value.upstream_id-- used for the traffic-split plugin if api_ctx.upstream_id then up_id = api_ctx.upstream_id endif up_id then local upstream = get_upstream_by_id(up_id) api_ctx.matched_upstream = upstreamelse if route.has_domain then local err route, err = parse_domain_in_route(route) if err then core.log.error("failed to get resolved route: ", err) return core.response.exit(500) endapi_ctx.conf_version = route.modifiedIndex api_ctx.matched_route = route endlocal route_val = route.value if route_val.upstream and route_val.upstream.enable_websocket then enable_websocket = true endapi_ctx.matched_upstream = (route.dns_value and route.dns_value.upstream) or route_val.upstream endif enable_websocket then api_ctx.var.upstream_upgrade= api_ctx.var.http_upgrade api_ctx.var.upstream_connection = api_ctx.var.http_connection core.log.info("enabled websocket for route: ", route.value.id) endif route.value.service_protocol == "grpc" then api_ctx.upstream_scheme = "grpc" endlocal code, err = set_upstream(route, api_ctx) if code then core.log.error("failed to set upstream: ", err) core.response.exit(code) endlocal server, err = load_balancer.pick_server(route, api_ctx) if not server then core.log.error("failed to pick server: ", err) return core.response.exit(502) endapi_ctx.picked_server = serverset_upstream_headers(api_ctx, server)-- run the before_proxy method in access phase first to avoid always reinit request common_phase("before_proxy")local ref = ctxdump.stash_ngx_ctx() core.log.info("stash ngx ctx: ", ref) ngx_var.ctx_ref = reflocal up_scheme = api_ctx.upstream_scheme if up_scheme == "grpcs" or up_scheme == "grpc" then return ngx.exec("@grpc_pass") endif api_ctx.dubbo_proxy_enabled then return ngx.exec("@dubbo_pass") end end

    1.初始化 api_ctx 上下文
    2.client tls 验证
    3.是否为 apisix 已经注册的路径,对请求进行匹配,内部使用 Radix tree 进行匹配
    4.向上下文注入该请求匹配的 apisix route ,service 等信息用于后续阶段使用
    5.向上下文注入该请求相关的插件,例如:请求对应的路由存在插件,若请求存在对应的 service 则加入 service 定义的插件,以及全局插件
    6.调用 "rewrite" 阶段的插件
    7.调用 "access" 阶段的插件
    8.获取 upstream
    9.执行 loadbalancer 选择 server
    10.调用 "balancer" 阶段插件
    11.判断 upstream ,根据 upstream 类型,grpc dubbo 等进入 @grpc_pass @dubbo_pass 等不同的后续处理流程。 这些配置可在 nginx.conf 中查看
  • apisix.http_balancer_phase()
    function _M.http_header_filter_phase() if ngx_var.ctx_ref ~= '' then -- prevent for the table leak local stash_ctx = fetch_ctx()-- internal redirect, so we should apply the ctx if ngx_var.from_error_page == "true" then ngx.ctx = stash_ctx end endcore.response.set_header("Server", ver_header)local up_status = get_var("upstream_status") if up_status and #up_status == 3 and tonumber(up_status) >= 500 and tonumber(up_status) <= 599 then set_resp_upstream_status(up_status) elseif up_status and #up_status > 3 then -- the up_status can be "502, 502" or "502, 502 : " local last_status if str_byte(up_status, -1) == str_byte(" ") then last_status = str_sub(up_status, -6, -3) else last_status = str_sub(up_status, -3) endif tonumber(last_status) >= 500 and tonumber(last_status) <= 599 then set_resp_upstream_status(up_status) end endcommon_phase("header_filter") end

1.设置头 "Server", APISIX
2.设置上游状态头:X-APISIX-Upstream-Status
3.执行 “header_filter” 阶段的插件
  • apisix.http_body_filter_phase()
    function _M.http_body_filter_phase() common_phase("body_filter") end

    执行 “body_filter” 阶段的插件
  • apisix.http_log_phase()
    function _M.http_log_phase() if ngx_var.ctx_ref ~= '' then -- prevent for the table leak local stash_ctx = fetch_ctx()-- internal redirect, so we should apply the ctx if ngx_var.from_error_page == "true" then ngx.ctx = stash_ctx end endlocal api_ctx = common_phase("log") if not api_ctx then return endhealthcheck_passive(api_ctx)if api_ctx.server_picker and api_ctx.server_picker.after_balance then api_ctx.server_picker.after_balance(api_ctx, false) endif api_ctx.uri_parse_param then core.tablepool.release("uri_parse_param", api_ctx.uri_parse_param) endcore.ctx.release_vars(api_ctx) if api_ctx.plugins then core.tablepool.release("plugins", api_ctx.plugins) endif api_ctx.curr_req_matched then core.tablepool.release("matched_route_record", api_ctx.curr_req_matched) endcore.tablepool.release("api_ctx", api_ctx) end

    1.执行 “log” 阶段的插件
    2.回收 plugins, matched_route_record,api_ctx 缓存
  • apisix.http_balancer_phase()
    function _M.http_balancer_phase() local api_ctx = ngx.ctx.api_ctx if not api_ctx then core.log.error("invalid api_ctx") return core.response.exit(500) endload_balancer.run(api_ctx.matched_route, api_ctx, common_phase) end

    1.设置 balance 的基本参数(超时时间,连接失败重试次数)
    2.选择上游服务器,采用一致性hash算法
    3.调用 set_current_peer 设置 proxy_pass
自定义插件逻辑 apisix 最详细源码分析以及手撸一个 apisix
文章图片

apisix 最详细源码分析以及手撸一个 apisix
文章图片

apisix 最详细源码分析以及手撸一个 apisix
文章图片

apisix 最详细源码分析以及手撸一个 apisix
文章图片

通过 common_phase 作为自定义插件的方法的公共入口,被 openresty 各个环节调用
local function common_phase(phase_name) local api_ctx = ngx.ctx.api_ctx if not api_ctx then return endplugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name)if api_ctx.script_obj then script.run(phase_name, api_ctx) return api_ctx, true endreturn plugin.run_plugin(phase_name, nil, api_ctx) end

真正执行自定义插件逻辑
for i = 1, #plugins, 2 do local phase_func = plugins[i][phase] if phase_func then plugin_run = true local code, body = phase_func(plugins[i + 1], api_ctx) if code or body then if is_http then if code >= 400 then core.log.warn(plugins[i].name, " exits with http status code ", code) endcore.response.exit(code, body) else if code >= 400 then core.log.warn(plugins[i].name, " exits with status code ", code) endngx_exit(1) end end end end

admin api
location /apisix/admin { set $upstream_scheme'http'; set $upstream_host$http_host; set $upstream_uri''; allow 127.0.0.0/24; deny all; content_by_lua_block { apisix.http_admin() } }

自定义插件 入口
local function common_phase(phase_name) local api_ctx = ngx.ctx.api_ctx if not api_ctx then return endplugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name)if api_ctx.script_obj then script.run(phase_name, api_ctx) return api_ctx, true endreturn plugin.run_plugin(phase_name, nil, api_ctx) end

已知的阶段
preread
ssl
access
balancer
rewrite
header_filter
body_filter
log
admin api 入口
location /apisix/admin { set $upstream_scheme'http'; set $upstream_host$http_host; set $upstream_uri''; allow 127.0.0.0/24; deny all; content_by_lua_block { apisix.http_admin() } }

根据路由进行转发
local ok = router:dispatch(get_var("uri"), {method = get_method()})

具体路由 dispatch 逻辑
function _M.dispatch(self, path, opts, ...) if type(path) ~= "string" then error("invalid argument path", 2) endlocal args local len = select('#', ...) if len > 0 then if not self.args then self.args = {...} else clear_tab(self.args) for i = 1, len do self.args[i] = select(i, ...) end end-- To keep the self.args in safe, -- we can't yield until filter_fun is called args = self.args args[0] = len endlocal route, err = match_route(self, path, opts or empty_table, args) if not route then if err then return nil, err end return nil endlocal handler = route.handler if not handler or type(handler) ~= "function" then return nil, "missing handler" endhandler(...) return true end

路由与hanlder 的关系
local uri_route = { { paths = [[/apisix/admin/*]], methods = {"GET", "PUT", "POST", "DELETE", "PATCH"}, handler = run, }, { paths = [[/apisix/admin/stream_routes/*]], methods = {"GET", "PUT", "POST", "DELETE", "PATCH"}, handler = run_stream, }, { paths = [[/apisix/admin/plugins/list]], methods = {"GET"}, handler = get_plugins_list, }, { paths = reload_event, methods = {"PUT"}, handler = post_reload_plugins, }, }

调用 run 方法
local function run() local api_ctx = {} core.ctx.set_vars_meta(api_ctx) ngx.ctx.api_ctx = api_ctxlocal ok, err = check_token(api_ctx) if not ok then core.log.warn("failed to check token: ", err) core.response.exit(401) endlocal uri_segs = core.utils.split_uri(ngx.var.uri) core.log.info("uri: ", core.json.delay_encode(uri_segs))-- /apisix/admin/schema/route local seg_res, seg_id = uri_segs[4], uri_segs[5] local seg_sub_path = core.table.concat(uri_segs, "/", 6) if seg_res == "schema" and seg_id == "plugins" then -- /apisix/admin/schema/plugins/limit-count seg_res, seg_id = uri_segs[5], uri_segs[6] seg_sub_path = core.table.concat(uri_segs, "/", 7) endlocal resource = resources[seg_res] if not resource then core.response.exit(404) endlocal method = str_lower(get_method()) if not resource[method] then core.response.exit(404) endlocal req_body, err = core.request.get_body(MAX_REQ_BODY) if err then core.log.error("failed to read request body: ", err) core.response.exit(400, {error_msg = "invalid request body: " .. err}) endif req_body then local data, err = core.json.decode(req_body) if not data then core.log.error("invalid request body: ", req_body, " err: ", err) core.response.exit(400, {error_msg = "invalid request body: " .. err, req_body = req_body}) endreq_body = data endlocal uri_args = ngx.req.get_uri_args() or {} if uri_args.ttl then if not tonumber(uri_args.ttl) then core.response.exit(400, {error_msg = "invalid argument ttl: " .. "should be a number"}) end endlocal code, data = https://www.it610.com/article/resource[method](seg_id, req_body, seg_sub_path, uri_args) if code then data = strip_etcd_resp(data) core.response.exit(code, data) end end

根据 resource 定义找到对应的 lua 模板的执行方法
local resources = { routes= require("apisix.admin.routes"), services= require("apisix.admin.services"), upstreams= require("apisix.admin.upstreams"), consumers= require("apisix.admin.consumers"), schema= require("apisix.admin.schema"), ssl= require("apisix.admin.ssl"), plugins= require("apisix.admin.plugins"), proto= require("apisix.admin.proto"), global_rules= require("apisix.admin.global_rules"), stream_routes= require("apisix.admin.stream_routes"), plugin_metadata = https://www.it610.com/article/require("apisix.admin.plugin_metadata"), plugin_configs= require("apisix.admin.plugin_config"), }

执行对应模块的对应的方法,例如:
GET http://127.0.0.1:9080/apisix/...
就是直接调用 plugin_metadata 的 get 方法
function _M.get(key)local path = "/plugin_metadata" if key then path = path .. "/" .. key endlocal res, err = core.etcd.get(path, not key) if not res then core.log.error("failed to get metadata[", key, "]: ", err) return 503, {error_msg = err} endreturn res.status, res.body end

control api 入口 apisix.http_control()
server { listen 127.0.0.1:9090; access_log off; location / { content_by_lua_block { apisix.http_control() } }location @50x.html { set $from_error_page 'true'; content_by_lua_block { require("apisix.error_handling").handle_500() } } }

先注册所有的插件的 control_api 方法,调用 router:dispatch 进行路由分发
function _M.match(uri) if cached_version ~= plugin_mod.load_times then local err router, err = fetch_control_api_router() if router == nil then core.log.error("failed to fetch valid api router: ", err) return false endcached_version = plugin_mod.load_times endcore.table.clear(match_opts) match_opts.method = get_method()return router:dispatch(uri, match_opts) end

例如:server_info 插件,注册路径 /v1/server_info 并指定使用 get_server_info函数进行处理
function _M.control_api() return { { methods = {"GET"}, uris ={"/v1/server_info"}, handler = get_server_info, } } end

注册 plugin的 control_api 方法
for _, plugin in ipairs(plugin_mod.plugins) do local api_fun = plugin.control_api if api_fun then local api_route = api_fun() register_api_routes(routes, api_route) end end

通过dispatch 方法调用插件的 handler 方法
local route, err = match_route(self, path, opts or empty_table, args) if not route then if err then return nil, err end return nil endlocal handler = route.handler if not handler or type(handler) ~= "function" then return nil, "missing handler" endhandler(...)

【apisix 最详细源码分析以及手撸一个 apisix】手撸 apisix 地址 :
https://github.com/mousycoder...

    推荐阅读