From b1213c51d70c4c1a52114db277dfa0b62064e4c4 Mon Sep 17 00:00:00 2001 From: andryyy Date: Wed, 30 Aug 2017 21:42:39 +0200 Subject: [PATCH] [Rspamd] Dynamic ratelimit fixed, removed async redis request; Ready to implement per-user ratelimits via UI (tbd) --- data/Dockerfiles/rspamd/Dockerfile | 5 +- data/Dockerfiles/rspamd/lua_util.lua | 152 ++++++ data/Dockerfiles/rspamd/ratelimit.lua | 666 +++++++++++++------------ data/conf/rspamd/custom/ratelimit.lua | 69 +-- data/conf/rspamd/lua/rspamd.local.lua | 57 +++ data/web/edit.php | 6 +- data/web/inc/functions.mailbox.inc.php | 4 +- docker-compose.yml | 2 +- 8 files changed, 565 insertions(+), 396 deletions(-) create mode 100644 data/Dockerfiles/rspamd/lua_util.lua diff --git a/data/Dockerfiles/rspamd/Dockerfile b/data/Dockerfiles/rspamd/Dockerfile index cb5c7d40c..32e0fe30e 100644 --- a/data/Dockerfiles/rspamd/Dockerfile +++ b/data/Dockerfiles/rspamd/Dockerfile @@ -8,8 +8,8 @@ RUN apt-get update && apt-get install -y \ ca-certificates \ gnupg2 \ apt-transport-https \ - && apt-key adv --fetch-keys https://rspamd.com/apt-stable/gpg.key \ - && echo "deb https://rspamd.com/apt-stable/ stretch main" > /etc/apt/sources.list.d/rspamd.list \ + && apt-key adv --fetch-keys https://rspamd.com/apt/gpg.key \ + && echo "deb https://rspamd.com/apt/ stretch main" > /etc/apt/sources.list.d/rspamd.list \ && apt-get update && apt-get install -y rspamd \ && rm -rf /var/lib/apt/lists/* \ && echo '.include $LOCAL_CONFDIR/local.d/rspamd.conf.local' > /etc/rspamd/rspamd.conf.local \ @@ -20,6 +20,7 @@ RUN apt-get update && apt-get install -y \ COPY settings.conf /etc/rspamd/modules.d/settings.conf COPY ratelimit.lua /usr/share/rspamd/lua/ratelimit.lua +COPY lua_util.lua /usr/share/rspamd/lib/lua_util.lua COPY docker-entrypoint.sh /docker-entrypoint.sh ENTRYPOINT ["/docker-entrypoint.sh"] diff --git a/data/Dockerfiles/rspamd/lua_util.lua b/data/Dockerfiles/rspamd/lua_util.lua new file mode 100644 index 000000000..a9abd901a --- /dev/null +++ b/data/Dockerfiles/rspamd/lua_util.lua @@ -0,0 +1,152 @@ +local exports = {} +local lpeg = require 'lpeg' + +local split_grammar = {} +local function rspamd_str_split(s, sep) + local gr = split_grammar[sep] + + if not gr then + local _sep = lpeg.P(sep) + local elem = lpeg.C((1 - _sep)^0) + local p = lpeg.Ct(elem * (_sep * elem)^0) + gr = p + split_grammar[sep] = gr + end + + return gr:match(s) +end + +exports.rspamd_str_split = rspamd_str_split + +local space = lpeg.S' \t\n\v\f\r' +local nospace = 1 - space +local ptrim = space^0 * lpeg.C((space^0 * nospace^1)^0) +local match = lpeg.match +exports.rspamd_str_trim = function(s) + return match(ptrim, s) +end + +-- Robert Jay Gould http://lua-users.org/wiki/SimpleRound +exports.round = function(num, numDecimalPlaces) + local mult = 10^(numDecimalPlaces or 0) + return math.floor(num * mult) / mult +end + +exports.template = function(tmpl, keys) + local var_lit = lpeg.P { lpeg.R("az") + lpeg.R("AZ") + lpeg.R("09") + "_" } + local var = lpeg.P { (lpeg.P("$") / "") * ((var_lit^1) / keys) } + local var_braced = lpeg.P { (lpeg.P("${") / "") * ((var_lit^1) / keys) * (lpeg.P("}") / "") } + + local template_grammar = lpeg.Cs((var + var_braced + 1)^0) + + return lpeg.match(template_grammar, tmpl) +end + +exports.remove_email_aliases = function(email_addr) + local function check_gmail_user(addr) + -- Remove all points + local no_dots_user = string.gsub(addr.user, '%.', '') + local cap, pluses = string.match(no_dots_user, '^([^%+][^%+]*)(%+.*)$') + if cap then + return cap, rspamd_str_split(pluses, '+'), nil + elseif no_dots_user ~= addr.user then + return no_dots_user,{},nil + end + + return nil + end + + local function check_address(addr) + if addr.user then + local cap, pluses = string.match(addr.user, '^([^%+][^%+]*)(%+.*)$') + if cap then + return cap, rspamd_str_split(pluses, '+'), nil + end + end + + return nil + end + + local function set_addr(addr, new_user, new_domain) + if new_user then + addr.user = new_user + end + if new_domain then + addr.domain = new_domain + end + + if addr.domain then + addr.addr = string.format('%s@%s', addr.user, addr.domain) + else + addr.addr = string.format('%s@', addr.user) + end + + if addr.name and #addr.name > 0 then + addr.raw = string.format('"%s" <%s>', addr.name, addr.addr) + else + addr.raw = string.format('<%s>', addr.addr) + end + end + + local function check_gmail(addr) + local nu, tags, nd = check_gmail_user(addr) + + if nu then + return nu, tags, nd + end + + return nil + end + + local function check_googlemail(addr) + local nd = 'gmail.com' + local nu, tags = check_gmail_user(addr) + + if nu then + return nu, tags, nd + end + + return nil, nil, nd + end + + local specific_domains = { + ['gmail.com'] = check_gmail, + ['googlemail.com'] = check_googlemail, + } + + if email_addr then + if email_addr.domain and specific_domains[email_addr.domain] then + local nu, tags, nd = specific_domains[email_addr.domain](email_addr) + if nu or nd then + set_addr(email_addr, nu, nd) + + return nu, tags + end + else + local nu, tags, nd = check_address(email_addr) + if nu or nd then + set_addr(email_addr, nu, nd) + + return nu, tags + end + end + + return nil + end +end + +exports.is_rspamc_or_controller = function(task) + local ua = task:get_request_header('User-Agent') or '' + local pwd = task:get_request_header('Password') + local is_rspamc = false + if tostring(ua) == 'rspamc' or pwd then is_rspamc = true end + + return is_rspamc +end + +local unpack_function = table.unpack or unpack +exports.unpack = function(t) + return unpack_function(t) +end + +return exports diff --git a/data/Dockerfiles/rspamd/ratelimit.lua b/data/Dockerfiles/rspamd/ratelimit.lua index d9e8f42a4..e25ea42db 100644 --- a/data/Dockerfiles/rspamd/ratelimit.lua +++ b/data/Dockerfiles/rspamd/ratelimit.lua @@ -1,5 +1,6 @@ --[[ -Copyright (c) 2011-2015, Vsevolod Stakhov +Copyright (c) 2011-2017, Vsevolod Stakhov +Copyright (c) 2016-2017, Andrew Lewis Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -18,13 +19,10 @@ if confighelp then return end --- A plugin that implements ratelimits using redis or kvstorage server +-- A plugin that implements ratelimits using redis -local E = {} - --- Default settings for limits, 1-st member is burst, second is rate and the third is numeric type -local settings = { -} +local E, settings = {}, {} +local N = 'ratelimit' -- Senders that are considered as bounce local bounce_senders = {'postmaster', 'mailer-daemon', '', 'null', 'fetchmail-daemon', 'mdaemon'} -- Do not check ratelimits for these recipients @@ -35,12 +33,12 @@ local max_rcpt = 5 local redis_params local ratelimit_symbol -- Do not delay mail after 1 day -local max_delay = 24 * 3600 local use_ip_score = false -local rl_prefix = 'rl' +local rl_prefix = 'RL' local ip_score_lower_bound = 10 local ip_score_ham_multiplier = 1.1 local ip_score_spam_divisor = 1.1 +local limits_hash local message_func = function(_, limit_type) return string.format('Ratelimit "%s" exceeded', limit_type) @@ -49,12 +47,143 @@ end local rspamd_logger = require "rspamd_logger" local rspamd_util = require "rspamd_util" local rspamd_lua_utils = require "lua_util" +local lua_redis = require "lua_redis" local fun = require "fun" local user_keywords = {'user'} +local redis_script_sha +local redis_script = [[local bucket +local limited = false +local buckets = {} +local queue_id = table.remove(ARGV) +local now = table.remove(ARGV) + +local argi = 0 +for i = 1, #KEYS do + local key = KEYS[i] + local period = tonumber(ARGV[argi+1]) + local limit = tonumber(ARGV[argi+2]) + if not buckets[key] then + buckets[key] = { + max_period = period, + limits = { {period, limit} }, + } + else + table.insert(buckets[key].limits, {period, limit}) + if period > buckets[key].max_period then + buckets[key].max_period = period + end + end + argi = argi + 2 +end + +for k, v in pairs(buckets) do + local maxp = v.max_period + redis.call('ZREMRANGEBYSCORE', k, '-inf', now - maxp) + for _, lim in ipairs(v.limits) do + local period = lim[1] + local limit = lim[2] + local rate + if period == maxp then + rate = redis.call('ZCARD', k) + else + rate = redis.call('ZCOUNT', k, now - period, '+inf') + end + if rate and rate >= limit then + limited = true + bucket = k + end + end + redis.call('EXPIRE', k, maxp) + if limited then break end +end + +if not limited then + for k in pairs(buckets) do + redis.call('ZADD', k, now, queue_id) + end +end + +return {limited, bucket}]] + +local redis_script_symbol = [[local limited = false +local buckets, results = {}, {} +local queue_id = table.remove(ARGV) +local now = table.remove(ARGV) + +local argi = 0 +for i = 1, #KEYS do + local key = KEYS[i] + local period = tonumber(ARGV[argi+1]) + local limit = tonumber(ARGV[argi+2]) + if not buckets[key] then + buckets[key] = { + max_period = period, + limits = { {period, limit} }, + } + else + table.insert(buckets[key].limits, {period, limit}) + if period > buckets[key].max_period then + buckets[key].max_period = period + end + end + argi = argi + 2 +end + +for k, v in pairs(buckets) do + local maxp = v.max_period + redis.call('ZREMRANGEBYSCORE', k, '-inf', now - maxp) + for _, lim in ipairs(v.limits) do + local period = lim[1] + local limit = lim[2] + local rate + if period == maxp then + rate = redis.call('ZCARD', k) + else + rate = redis.call('ZCOUNT', k, now - period, '+inf') + end + if rate then + local mult = 2 * math.tanh(rate / (limit * 2)) + if mult >= 0.5 then + table.insert(results, {k, tostring(mult)}) + end + end + end + redis.call('ZADD', k, now, queue_id) + redis.call('EXPIRE', k, maxp) +end + +return results]] + +local function load_scripts(cfg, ev_base) + local function rl_script_cb(err, data) + if err then + rspamd_logger.errx(cfg, 'Script loading failed: ' .. err) + elseif type(data) == 'string' then + redis_script_sha = data + end + end + local script + if ratelimit_symbol then + script = redis_script_symbol + else + script = redis_script + end + lua_redis.redis_make_request_taskless( + ev_base, + cfg, + redis_params, + nil, -- key + true, -- is write + rl_script_cb, --callback + 'SCRIPT', -- command + {'LOAD', script} + ) +end + local limit_parser -local function parse_string_limit(lim) +local function parse_string_limit(lim, no_error) local function parse_time_suffix(s) if s == 's' then return 1 @@ -107,46 +236,16 @@ local function parse_string_limit(lim) local t = lpeg.match(limit_parser.limit, lim) if t and t[1] and t[2] and t[2] ~= 0 then - return t[1] / t[2], t[1] + return t[2], t[1] end - rspamd_logger.errx(rspamd_config, 'bad limit: %s', lim) + if not no_error then + rspamd_logger.errx(rspamd_config, 'bad limit: %s', lim) + end return nil end ---- Parse atime and bucket of limit -local function parse_limits(data) - local function parse_limit_elt(str) - local elts = rspamd_str_split(str, ':') - if not elts or #elts < 2 then - return {0, 0, 0} - else - local atime = tonumber(elts[1]) - local bucket = tonumber(elts[2]) - local ctime = atime - - if elts[3] then - ctime = tonumber(elts[3]) - end - - if not ctime then - ctime = atime - end - - return {atime,bucket,ctime} - end - end - - return fun.iter(data):map(function(e) - if type(e) == 'string' then - return parse_limit_elt(e) - else - return {0, 0, 0} - end - end):totable() -end - local function resize_element(x_score, x_total, element) local x_ip_score if not x_total then x_total = 0 end @@ -191,7 +290,7 @@ local keywords = { ['get_value'] = function(task) local from = task:get_from(0) if ((from or E)[1] or E).addr then - return from[1]['addr'] + return string.lower(from[1]['addr']) end return nil end, @@ -270,7 +369,7 @@ local function dynamic_rate_key(task, rtype) local total_rcpt = 0 for _, r in ipairs(rcpts) do if r['addr'] and total_rcpt < max_rcpt then - local key_f = string.format(key_s, r['addr']) + local key_f = string.format(key_s, string.lower(r['addr'])) table.insert(rate_keys, key_f) total_rcpt = total_rcpt + 1 end @@ -279,194 +378,86 @@ local function dynamic_rate_key(task, rtype) end end ---- Check specific limit inside redis -local function check_limits(task, args) - - local key = fun.foldl(function(acc, k) return acc .. k[2] end, '', args) - local ret - --- Called when value is got from server - local function rate_get_cb(err, data) - if err then - rspamd_logger.infox(task, 'got error while getting limit: %1', err) - end - if not data then return end - local ntime = rspamd_util.get_time() - local asn_score,total_asn, - country_score,total_country, - ipnet_score,total_ipnet, - ip_score, total_ip - if use_ip_score then - asn_score,total_asn, - country_score,total_country, - ipnet_score,total_ipnet, - ip_score, total_ip = task:get_mempool():get_variable('ip_score', - 'double,double,double,double,double,double,double,double') - end - - fun.each(function(elt, limit, rtype) - local bucket = elt[2] - local rate = limit[2] - local threshold = limit[1] - local atime = elt[1] - local ctime = elt[3] - - if atime == 0 then return end - - if use_ip_score then - local key_keywords = rspamd_str_split(rtype, '_') - local has_asn, has_ip = false, false - for _, v in ipairs(key_keywords) do - if v == "asn" then has_asn = true end - if v == "ip" then has_ip = true end - if has_ip and has_asn then break end - end - if has_asn and not has_ip then - bucket = resize_element(asn_score, total_asn, bucket) - rate = resize_element(asn_score, total_asn, rate) - elseif has_ip then - if total_ip and total_ip > ip_score_lower_bound then - bucket = resize_element(ip_score, total_ip, bucket) - rate = resize_element(ip_score, total_ip, rate) - elseif total_ipnet and total_ipnet > ip_score_lower_bound then - bucket = resize_element(ipnet_score, total_ipnet, bucket) - rate = resize_element(ipnet_score, total_ipnet, rate) - elseif total_asn and total_asn > ip_score_lower_bound then - bucket = resize_element(asn_score, total_asn, bucket) - rate = resize_element(asn_score, total_asn, rate) - elseif total_country and total_country > ip_score_lower_bound then - bucket = resize_element(country_score, total_country, bucket) - rate = resize_element(country_score, total_country, rate) - else - bucket = resize_element(ip_score, total_ip, bucket) - rate = resize_element(ip_score, total_ip, rate) - end - end - end - - if atime - ctime > max_delay then - rspamd_logger.infox(task, 'limit is too old: %1 seconds; ignore it', - atime - ctime) - else - bucket = bucket - rate * (ntime - atime); - if bucket > 0 then - if ratelimit_symbol then - local mult = 2 * rspamd_util.tanh(bucket / (threshold * 2)) - - if mult > 0.5 then - task:insert_result(ratelimit_symbol, mult, - rtype .. ':' .. string.format('%.2f', mult)) - end - else - if bucket > threshold then - rspamd_logger.infox(task, - 'ratelimit "%s" exceeded: %s elements with %s limit', - rtype, bucket, threshold) - task:set_pre_result('soft reject', - message_func(task, rtype, bucket, threshold)) - end - end - end - end - end, fun.zip(parse_limits(data), fun.map(function(a) return a[1] end, args), - fun.map(function(a) return rspamd_str_split(a[2], ":")[2] end, args))) - end - - ret = rspamd_redis_make_request(task, - redis_params, -- connect params - key, -- hash key - false, -- is write - rate_get_cb, --callback - 'mget', -- command - fun.totable(fun.map(function(l) return l[2] end, args)) -- arguments - ) - if not ret then - rspamd_logger.errx(task, 'got error connecting to redis') - end -end - ---- Set specific limit inside redis -local function set_limits(task, args) - local key = fun.foldl(function(acc, k) return acc .. k[2] end, '', args) - local ret, upstream - - local function rate_set_cb(err) - if err then - rspamd_logger.infox(task, 'got error %s when setting ratelimit record on server %s', - err, upstream:get_addr()) - end - end - local function rate_get_cb(err, data) +local function process_buckets(task, buckets) + if not buckets then return end + local function rl_redis_cb(err, data) if err then rspamd_logger.infox(task, 'got error while setting limit: %1', err) end if not data then return end - local ntime = rspamd_util.get_time() - local values = {} - fun.each(function(elt, limit) - local bucket = elt[2] - local rate = limit[1][2] - local atime = elt[1] - local ctime = elt[3] - - if atime - ctime > max_delay then - rspamd_logger.infox(task, 'limit is too old: %1 seconds; start it over', - atime - ctime) - bucket = 1 - ctime = ntime - else - if bucket > 0 then - bucket = bucket - rate * (ntime - atime) + 1; - if bucket < 0 then - bucket = 1 - end - else - bucket = 1 - end - end - - if ctime == 0 then ctime = ntime end - - local lstr = string.format('%.3f:%.3f:%.3f', ntime, bucket, ctime) - table.insert(values, {limit[2], max_delay, lstr}) - end, fun.zip(parse_limits(data), fun.iter(args))) - - if #values > 0 then - local conn - ret,conn,upstream = rspamd_redis_make_request(task, - redis_params, -- connect params - key, -- hash key - true, -- is write - rate_set_cb, --callback - 'setex', -- command - values[1] -- arguments - ) - - if conn then - fun.each(function(v) - conn:add_cmd('setex', v) - end, fun.drop_n(1, values)) - else - rspamd_logger.errx(task, 'got error while connecting to redis') - end + if data[1] == 1 then + rspamd_logger.infox(task, + 'ratelimit "%s" exceeded', + data[2]) + task:set_pre_result('soft reject', + message_func(task, data[2])) end end - - local _ - ret,_,upstream = rspamd_redis_make_request(task, + local function rl_symbol_redis_cb(err, data) + if err then + rspamd_logger.infox(task, 'got error while setting limit: %1', err) + end + if not data then return end + for i, b in ipairs(data) do + task:insert_result(ratelimit_symbol, b[2], string.format('%s:%s:%s', i, b[1], b[2])) + end + end + local redis_cb = rl_redis_cb + if ratelimit_symbol then redis_cb = rl_symbol_redis_cb end + local args = {redis_script_sha, #buckets} + for _, bucket in ipairs(buckets) do + table.insert(args, bucket[2]) + end + for _, bucket in ipairs(buckets) do + if use_ip_score then + local asn_score,total_asn, + country_score,total_country, + ipnet_score,total_ipnet, + ip_score, total_ip = task:get_mempool():get_variable('ip_score', + 'double,double,double,double,double,double,double,double') + local key_keywords = rspamd_str_split(bucket[2], '_') + local has_asn, has_ip = false, false + for _, v in ipairs(key_keywords) do + if v == "asn" then has_asn = true end + if v == "ip" then has_ip = true end + if has_ip and has_asn then break end + end + if has_asn and not has_ip then + bucket[1][2] = resize_element(asn_score, total_asn, bucket[1][2]) + elseif has_ip then + if total_ip and total_ip > ip_score_lower_bound then + bucket[1][2] = resize_element(ip_score, total_ip, bucket[1][2]) + elseif total_ipnet and total_ipnet > ip_score_lower_bound then + bucket[1][2] = resize_element(ipnet_score, total_ipnet, bucket[1][2]) + elseif total_asn and total_asn > ip_score_lower_bound then + bucket[1][2] = resize_element(asn_score, total_asn, bucket[1][2]) + elseif total_country and total_country > ip_score_lower_bound then + bucket[1][2] = resize_element(country_score, total_country, bucket[1][2]) + else + bucket[1][2] = resize_element(ip_score, total_ip, bucket[1][2]) + end + end + end + table.insert(args, bucket[1][1]) + table.insert(args, bucket[1][2]) + end + table.insert(args, rspamd_util.get_time()) + table.insert(args, task:get_queue_id() or task:get_uid()) + local ret = rspamd_redis_make_request(task, redis_params, -- connect params - key, -- hash key - false, -- is write - rate_get_cb, --callback - 'mget', -- command - fun.totable(fun.map(function(l) return l[2] end, args)) -- arguments + nil, -- hash key + true, -- is write + redis_cb, --callback + 'evalsha', -- command + args -- arguments ) if not ret then rspamd_logger.errx(task, 'got error connecting to redis') end end ---- Check or update ratelimit -local function rate_test_set(task, func) +local function ratelimit_cb(task) + if rspamd_lua_utils.is_rspamc_or_controller(task) then return end local args = {} -- Get initial task data local ip = task:get_from_ip() @@ -481,9 +472,13 @@ local function rate_test_set(task, func) local rcpts = task:get_recipients() local rcpts_user = {} if rcpts then - fun.each(function(r) table.insert(rcpts_user, r['user']) end, rcpts) - if fun.any(function(r) - fun.any(function(w) return r == w end, whitelisted_rcpts) end, + fun.each(function(r) + fun.each(function(type) table.insert(rcpts_user, r[type]) end, {'user', 'addr'}) + end, rcpts) + if fun.any( + function(r) + if fun.any(function(w) return r == w end, whitelisted_rcpts) then return true end + end, rcpts_user) then rspamd_logger.infox(task, 'skip ratelimit for whitelisted recipient') @@ -499,110 +494,136 @@ local function rate_test_set(task, func) end end + local redis_keys = {} + local redis_keys_rev = {} + local function collect_redis_keys() + local function collect_cb(err, data) + if err then + rspamd_logger.errx(task, 'redis error: %1', err) + else + for i, d in ipairs(data) do + if type(d) == 'string' then + local plim, size = parse_string_limit(d) + if plim then + table.insert(args, {{plim, size}, redis_keys_rev[i]}) + end + end + end + return process_buckets(task, args) + end + end + local params, method + if limits_hash then + params = {limits_hash, rspamd_lua_utils.unpack(redis_keys)} + method = 'HMGET' + else + method = 'MGET' + params = redis_keys + end + local requested_keys = rspamd_redis_make_request(task, + redis_params, -- connect params + nil, -- hash key + true, -- is write + collect_cb, --callback + method, -- command + params -- arguments + ) + if not requested_keys then + rspamd_logger.errx(task, 'got error connecting to redis') + return process_buckets(task, args) + end + end + local rate_key for k in pairs(settings) do rate_key = dynamic_rate_key(task, k) if rate_key then if type(rate_key) == 'table' then for _, rk in ipairs(rate_key) do - if type(settings[k]) == 'table' then - table.insert(args, {settings[k], rk}) - elseif type(settings[k]) == 'string' and + if type(settings[k]) == 'string' and (custom_keywords[settings[k]] and type(custom_keywords[settings[k]]['get_limit']) == 'function') then local res = custom_keywords[settings[k]]['get_limit'](task) - if type(res) == 'table' then - table.insert(args, {res, rate_key}) - elseif type(res) == 'string' then - local plim, size = parse_string_limit(res) + if type(res) == 'string' then res = {res} end + for _, r in ipairs(res) do + local plim, size = parse_string_limit(r, true) if plim then - table.insert(args, {{size, plim, 1}, rate_key}) + table.insert(args, {{plim, size}, rk}) + else + local rkey = string.match(settings[k], 'redis:(.*)') + if rkey then + table.insert(redis_keys, rkey) + redis_keys_rev[#redis_keys] = rk + else + rspamd_logger.infox(task, "Don't know what to do with limit: %1", settings[k]) + end end end end end else - if type(settings[k]) == 'table' then - table.insert(args, {settings[k], rate_key}) - elseif type(settings[k]) == 'string' and - (custom_keywords[settings[k]] and type(custom_keywords[settings[k]]['get_limit']) == 'function') then + if type(settings[k]) == 'string' and + (custom_keywords[settings[k]] and type(custom_keywords[settings[k]]['get_limit']) == 'function') then local res = custom_keywords[settings[k]]['get_limit'](task) - if type(res) == 'table' then - table.insert(args, {res, rate_key}) - elseif type(res) == 'string' then - local plim, size = parse_string_limit(res) + if type(res) == 'string' then res = {res} end + for _, r in ipairs(res) do + local plim, size = parse_string_limit(r, true) if plim then - table.insert(args, {{size, plim, 1}, rate_key}) + table.insert(args, {{plim, size}, rate_key}) + else + local rkey = string.match(r, 'redis:(.*)') + if rkey then + table.insert(redis_keys, rkey) + redis_keys_rev[#redis_keys] = rate_key + else + rspamd_logger.infox(task, "Don't know what to do with limit: %1", settings[k]) + end end end + elseif type(settings[k]) == 'table' then + for _, rl in ipairs(settings[k]) do + table.insert(args, {{rl[1], rl[2]}, rate_key}) + end + elseif type(settings[k]) == 'string' then + local rkey = string.match(settings[k], 'redis:(.*)') + if rkey then + table.insert(redis_keys, rkey) + redis_keys_rev[#redis_keys] = rate_key + else + rspamd_logger.infox(task, "Don't know what to do with limit: %1", settings[k]) + end end end end end - if #args > 0 then - func(task, args) + if redis_keys[1] then + return collect_redis_keys() + else + return process_buckets(task, args) end end ---- Check limit -local function rate_test(task) - if rspamd_lua_utils.is_rspamc_or_controller(task) then return end - rate_test_set(task, check_limits) -end ---- Update limit -local function rate_set(task) - local action = task:get_metric_action('default') - - if action ~= 'soft reject' then - if rspamd_lua_utils.is_rspamc_or_controller(task) then return end - rate_test_set(task, set_limits) - end -end - - ---- Parse a single limit description -local function parse_limit(str) - local params = rspamd_str_split(str, ':') - - local function set_limit(limit, burst, rate) - limit[1] = tonumber(burst) - limit[2] = tonumber(rate) - end - - if #params ~= 3 then - rspamd_logger.errx(rspamd_config, 'invalid limit definition: ' .. str) - return - end - - local key_keywords = rspamd_str_split(params[1], '_') - for _, k in ipairs(key_keywords) do - if (custom_keywords[k] and type(custom_keywords[k]['get_value']) == 'function') or - (keywords[k] and type(keywords[k]['get_value']) == 'function') then - set_limit(settings[params[1]], params[2], params[3]) - else - rspamd_logger.errx(rspamd_config, 'invalid limit type: ' .. params[1]) - end - end -end - -local opts = rspamd_config:get_all_opt('ratelimit') +local opts = rspamd_config:get_all_opt(N) if opts then - local rates = opts['limit'] - if rates and type(rates) == 'table' then - fun.each(parse_limit, rates) - elseif rates and type(rates) == 'string' then - parse_limit(rates) + if opts['limit'] then + rspamd_logger.errx(rspamd_config, 'Legacy ratelimit config format no longer supported') end if opts['rates'] and type(opts['rates']) == 'table' then -- new way of setting limits fun.each(function(t, lim) if type(lim) == 'table' then - settings[t] = lim + settings[t] = {} + fun.each(function(l) + local plim, size = parse_string_limit(l) + if plim then + table.insert(settings[t], {plim, size}) + end + end, lim) elseif type(lim) == 'string' then local plim, size = parse_string_limit(lim) if plim then - settings[t] = {size, plim, 1} + settings[t] = { {plim, size} } end end end, opts['rates']) @@ -618,11 +639,7 @@ if opts then local enabled_limits = fun.totable(fun.map(function(t) return t - end, fun.filter(function(_, lim) - return type(lim) == 'string' or - (type(lim) == 'table' and type(lim[1]) == 'number' and lim[1] > 0) - or (type(lim) == 'table' and (lim[3])) - end, settings))) + end, settings)) rspamd_logger.infox(rspamd_config, 'enabled rate buckets: [%1]', table.concat(enabled_limits, ',')) if opts['whitelisted_rcpts'] and type(opts['whitelisted_rcpts']) == 'string' then @@ -650,10 +667,6 @@ if opts then max_rcpt = tonumber(opts['max_rcpt']) end - if opts['max_delay'] then - max_rcpt = tonumber(opts['max_delay']) - end - if opts['use_ip_score'] then use_ip_score = true local ip_score_opts = rspamd_config:get_all_opt('ip_score') @@ -674,38 +687,30 @@ if opts then message_func = assert(load(opts['message_func']))() end + if opts['limits_hash'] then + limits_hash = opts['limits_hash'] + end + redis_params = rspamd_parse_redis_server('ratelimit') if not redis_params then rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module') else - if not ratelimit_symbol and not use_ip_score then - rspamd_config:register_symbol({ - name = 'RATELIMIT_CHECK', - callback = rate_test, - type = 'prefilter', - priority = 4, - }) - else - local symbol - if not ratelimit_symbol then - symbol = 'RATELIMIT_CHECK' - else - symbol = ratelimit_symbol - end - local id = rspamd_config:register_symbol({ - name = symbol, - callback = rate_test, - }) - if use_ip_score then - rspamd_config:register_dependency(id, 'IP_SCORE') - end + local s = { + type = 'prefilter,nostat', + name = 'RATELIMIT_CHECK', + priority = 4, + callback = ratelimit_cb, + } + if use_ip_score then + s.type = 'normal' + end + if ratelimit_symbol then + s.name = ratelimit_symbol + end + local id = rspamd_config:register_symbol(s) + if use_ip_score then + rspamd_config:register_dependency(id, 'IP_SCORE') end - rspamd_config:register_symbol({ - name = 'RATELIMIT_SET', - type = 'postfilter', - priority = 5, - callback = rate_set, - }) for _, v in pairs(custom_keywords) do if type(v) == 'table' and type(v['init']) == 'function' then v['init']() @@ -713,5 +718,6 @@ if opts then end end end - - +rspamd_config:add_on_load(function(cfg, ev_base, worker) + load_scripts(cfg, ev_base) +end) diff --git a/data/conf/rspamd/custom/ratelimit.lua b/data/conf/rspamd/custom/ratelimit.lua index ed6b1bd6b..a7beac49f 100644 --- a/data/conf/rspamd/custom/ratelimit.lua +++ b/data/conf/rspamd/custom/ratelimit.lua @@ -1,66 +1,25 @@ local custom_keywords = { ['customrl'] = {}, } + function custom_keywords.customrl.get_value(task) local rspamd_logger = require "rspamd_logger" - local rspamd_redis = require "rspamd_redis" - local rspamd_regexp = require "rspamd_regexp" - local re = rspamd_regexp.create('/^\\s*$/i') - local envfrom = task:get_from(1) - local env_from_addr = envfrom[1].addr:lower() -- get smtp from addr in lower case - local env_from_domain = envfrom[1].domain:lower() -- get smtp from domain in lower case - - local function rlo(object) -- get ratelimited object - local rlobj = string.format('%s', object) - - local rl_ret, rl_obj = rspamd_redis.make_request_sync({host="172.22.1.249:6379", cmd='HGET', args={'RL_OBJECT', rlobj}, timeout=2.0}) - - if rl_ret and rl_obj then - return rl_obj - else - return false - end - end - - rl_addr = rlo(env_from_addr) - rl_domain = rlo(env_from_domain) - if type(rl_addr) == 'string' and not re:match(rl_addr) then - rspamd_logger.infox(rspamd_config, "returning ratelimit object for %s", env_from_addr) - return rl_addr - elseif type(rl_domain) == 'string' and not re:match(rl_domain) then - rspamd_logger.infox(rspamd_config, "returning ratelimit object for %s", env_from_domain) - return rl_domain + if task:has_symbol('DYN_RL') then + rspamd_logger.infox(rspamd_config, "task has a dynamic ratelimit symbol, processing...") + return "check" + else + rspamd_logger.infox(rspamd_config, "task has no dynamic ratelimit symbol, skipping...") + return end end function custom_keywords.customrl.get_limit(task) local rspamd_logger = require "rspamd_logger" - local rspamd_redis = require "rspamd_redis" - local rspamd_regexp = require "rspamd_regexp" - local re = rspamd_regexp.create('/^\\s*$/i') - local envfrom = task:get_from(1) - local env_from_addr = envfrom[1].addr:lower() -- get smtp from addr in lower case - local env_from_domain = envfrom[1].domain:lower() -- get smtp from domain in lower case - - local function rlv(object) -- get ratelimited object - local rlobj = string.format('%s', object) - - local rl_ret, rl_value = rspamd_redis.make_request_sync({host="172.22.1.249:6379", cmd='HGET', args={'RL_VALUE', rlobj}, timeout=2.0}) - - if rl_ret and rl_value then - return rl_value - else - return false - end - end - - rl_addr = rlv(env_from_addr) - rl_domain = rlv(env_from_domain) - if type(rl_addr) == 'string' and not re:match(rl_addr) then - rspamd_logger.infox(rspamd_config, "returning ratelimit %s for %s", rl_addr, env_from_addr) - return rl_addr - elseif type(rl_domain) == 'string' and not re:match(rl_domain) then - rspamd_logger.infox(rspamd_config, "returning ratelimit %s for %s", rl_domain, env_from_domain) - return rl_domain + local dyn_rl_symbol = task:get_symbol("DYN_RL") + if dyn_rl_symbol then + local rl_value = dyn_rl_symbol[1].options[1] + rspamd_logger.infox(rspamd_config, "dynamic ratelimit symbol has option %s, returning...", rl_value) + return rl_value end end -return custom_keywords +-- returning custom keywords +return custom_keywords \ No newline at end of file diff --git a/data/conf/rspamd/lua/rspamd.local.lua b/data/conf/rspamd/lua/rspamd.local.lua index 75f528921..623837073 100644 --- a/data/conf/rspamd/lua/rspamd.local.lua +++ b/data/conf/rspamd/lua/rspamd.local.lua @@ -50,4 +50,61 @@ rspamd_config:register_symbol({ priority = 11 }) +rspamd_config:register_symbol({ + name = 'DYN_RL_CHECK', + type = 'prefilter', + callback = function(task) + local util = require("rspamd_util") + local redis_params = rspamd_parse_redis_server('dyn_rl') + local rspamd_logger = require "rspamd_logger" + local envfrom = task:get_from(1) + local env_from_domain = envfrom[1].domain:lower() -- get smtp from domain in lower case + local env_from_addr = envfrom[1].addr:lower() -- get smtp from addr in lower case + local function redis_cb_user(err, data) + + if err or type(data) ~= 'string' then + rspamd_logger.infox(rspamd_config, "dynamic ratelimit request for user %s returned invalid or empty data (\"%s\") or error (\"%s\") - trying dynamic ratelimit for domain...", env_from_addr, data, err) + + local function redis_key_cb_domain(err, data) + if err or type(data) ~= 'string' then + rspamd_logger.infox(rspamd_config, "dynamic ratelimit request for domain %s returned invalid or empty data (\"%s\") or error (\"%s\")", env_from_domain, data, err) + else + rspamd_logger.infox(rspamd_config, "found dynamic ratelimit in redis for domain %s with value %s", env_from_domain, data) + task:insert_result('DYN_RL', 0.0, data) + end + end + + local redis_ret_domain = rspamd_redis_make_request(task, + redis_params, -- connect params + env_from_domain, -- hash key + false, -- is write + redis_key_cb_domain, --callback + 'HGET', -- command + {'RL_VALUE', env_from_domain} -- arguments + ) + if not redis_ret_domain then + rspamd_logger.infox(rspamd_config, "cannot make request to load ratelimit for domain") + end + else + rspamd_logger.infox(rspamd_config, "found dynamic ratelimit in redis for user %s with value %s", env_from_addr, data) + task:insert_result('DYN_RL', 0.0, data) + end + + end + + local redis_ret_user = rspamd_redis_make_request(task, + redis_params, -- connect params + env_from_addr, -- hash key + false, -- is write + redis_cb_user, --callback + 'HGET', -- command + {'RL_VALUE', env_from_addr} -- arguments + ) + if not redis_ret_user then + rspamd_logger.infox(rspamd_config, "cannot make request to load ratelimit for user") + end + return true + end, + priority = 20 +}) \ No newline at end of file diff --git a/data/web/edit.php b/data/web/edit.php index b40f1ae00..4511ab943 100644 --- a/data/web/edit.php +++ b/data/web/edit.php @@ -238,7 +238,6 @@ if (isset($_SESSION['mailcow_cc_role'])) { } ?>
-

@@ -316,7 +314,7 @@ if (isset($_SESSION['mailcow_cc_role'])) { !empty($_GET["aliasdomain"])) { $alias_domain = $_GET["aliasdomain"]; $result = mailbox('get', 'alias_domain_details', $alias_domain); - // $rl = mailbox('get', 'domain_ratelimit', $alias_domain); + $rl = mailbox('get', 'domain_ratelimit', $alias_domain); if (!empty($result)) { ?>

@@ -341,7 +339,6 @@ if (isset($_SESSION['mailcow_cc_role'])) {
- diff --git a/data/web/inc/functions.mailbox.inc.php b/data/web/inc/functions.mailbox.inc.php index 7769e0d31..e62611721 100644 --- a/data/web/inc/functions.mailbox.inc.php +++ b/data/web/inc/functions.mailbox.inc.php @@ -1214,7 +1214,6 @@ function mailbox($_action, $_type, $_data = null) { } if (empty($rl_value)) { try { - $redis->hDel('RL_OBJECT', $domain); $redis->hDel('RL_VALUE', $domain); } catch (RedisException $e) { @@ -1227,7 +1226,6 @@ function mailbox($_action, $_type, $_data = null) { } else { try { - $redis->hSet('RL_OBJECT', $domain, '1'); $redis->hSet('RL_VALUE', $domain, $rl_value . ' / 1' . $rl_frame); } catch (RedisException $e) { @@ -2389,7 +2387,7 @@ function mailbox($_action, $_type, $_data = null) { return false; } try { - if (($rl_value = $redis->hGet('RL_VALUE', $_data)) && $redis->hGet('RL_OBJECT', $_data)) { + if ($rl_value = $redis->hGet('RL_VALUE', $_data)) { $rl = explode(' / 1', $rl_value); $data['value'] = $rl[0]; $data['frame'] = $rl[1]; diff --git a/docker-compose.yml b/docker-compose.yml index d849e6321..0d973dfe2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -80,7 +80,7 @@ services: - clamd rspamd-mailcow: - image: mailcow/rspamd:1.5 + image: mailcow/rspamd:1.6 build: ./data/Dockerfiles/rspamd command: > /bin/bash -c "