Python如何检测欺诈交易?了解如何使用 Apache Kafka 和 Python API 检测和过滤流应用程序中的欺诈交易。
Python Kafka检测流媒体欺诈交易:随着系统的扩展和迁移,打击欺诈者已成为最重要的挑战,最好使用实时流处理技术来解决。
Python Kafka检测欺诈交易教程:出于本教程的目的,我们将从头开始构建一个实时欺诈检测系统,在该系统中我们将生成一个虚构的交易流并同步分析它们以检测哪些是欺诈的。
先决条件根据我们的要求,我们需要一个可靠、可扩展和容错的事件流平台,用于存储我们的输入事件或交易和处理结果。Apache Kafka是一个开源分布式流媒体平台,正好满足了这一需求。
Apache Kafka 可以从其官方站点下载,在你的操作系统上安装和运行 Apache Kafka 不在本教程的范围内。但是,你可以查看本指南,了解如何在 Ubuntu(或任何基于 Debian 的发行版)上安装它。
完成所有设置后,你可以检查以下内容:
- Zookeeper 实例正在TCP 端口 2181上运行。
- 一个 Kafka 实例正在运行并绑定到TCP 端口 9092。
创建应用程序骨架我们现在可以开始使用 Kafka 的消费者和生产者 API 构建我们的实时欺诈检测应用程序。
我们的应用程序将包括:
- 一个交易生成器,一方面,它产生虚构的交易来模拟事件流。
- 另一方面,欺诈检测器可过滤掉看起来可疑的交易。
文章图片
Python如何检测欺诈交易?让我们直接进入设置。当然,你需要在系统上安装 Python 3。我将使用一个虚拟环境来安装所需的库,这无疑是选择的最佳方法:
- 创建一个虚拟环境并激活它:
$ python -m venv fraud-detector $ source fraud-detector/bin/activate
复制 - 创建文件
requirements.txt
并向其添加以下行:Kafka-python==2.0.2 Flask==1.1.2
复制 - 安装库:
$ pip install -r requirements.txt
复制
文章图片
清楚了这一点,让我们现在开始编写实际代码。
首先,让我们在我们的
settings.py
文件中初始化我们的参数:# URL for our broker used for connecting to the Kafka cluster
KAFKA_BROKER_URL= "localhost:9092"
# name of the topic hosting the transactions to be processed and requiring processing
TRANSACTIONS_TOPIC = "queuing.transactions"
# these 2 variables will control the amount of transactions automatically generated
TRANSACTIONS_PER_SECOND = float("2.0")
SLEEP_TIME = 1 / TRANSACTIONS_PER_SECOND
# name of the topic hosting the legitimate transactions
LEGIT_TOPIC = "queuing.legit"
# name of the topic hosting the suspicious transactions
FRAUD_TOPIC = "queuing.fraud"
注意:为简洁起见,我将配置参数硬编码在 中
settings.py
,但建议将这些参数存储在单独的文件中(例如 .env)其次,让我们创建一个 Python 文件,
transactions.py
用于动态创建随机交易:from random import choices, randint
from string import ascii_letters, digitsaccount_chars: str = digits + ascii_lettersdef _random_account_id() -> str:
"""Return a random account number made of 12 characters"""
return "".join(choices(account_chars,k=12))def _random_amount() -> float:
"""Return a random amount between 1.00 and 1000.00"""
return randint(100,1000000)/100def create_random_transaction() -> dict:
"""Create a fake randomised transaction."""
return {
"source":_random_account_id()
,"target":_random_account_id()
,"amount":_random_amount()
,"currency":"EUR"
}
Python Kafka检测欺诈交易教程:第三,让我们构建我们的交易生成器,它将用于创建交易流。Python 文件
producer.py
将扮演交易生成器的角色,并将发布的交易存储在名为 的主题中queuing.transactions
,以下是 的代码producer.py
:import os
import json
from time import sleep
from kafka import KafkaProducer
# import initialization parameters
from settings import *
from transactions import create_random_transactionif __name__ == "__main__":
producer = KafkaProducer(bootstrap_servers = KAFKA_BROKER_URL
#Encode all values as JSON
,value_serializer = lambda value: json.dumps(value).encode()
,)
while True:
transaction: dict = create_random_transaction()
producer.send(TRANSACTIONS_TOPIC, value= https://www.lsbin.com/transaction)
print(transaction) #DEBUG
sleep(SLEEP_TIME)
为了确保你走在正确的轨道上,让我们测试一下
producer.py
程序。为此,请打开终端窗口并键入以下内容:$ python producer.py
注意:在执行测试之前,请确保 Kafka 服务器正在运行。
你应该会看到类似于以下内容的输出:
文章图片
第四,在我们保证我们的生产者程序启动并运行之后,让我们现在开始构建欺诈检测机制来处理交易流并查明欺诈交易。
Python Kafka检测流媒体欺诈交易:我们将开发这个程序的两个版本:
- 版本 1
detector.py
:该程序将根据特定标准或一组标准过滤掉排队的交易,并将结果输出到两个单独的主题中:一个用于合法交易LEGIT_TOPIC
,另一个FRAUD_TOPIC
用于满足我们选择的标准的欺诈交易。
下面是代码
detector.py
:import os
import json
from kafka import KafkaConsumer, KafkaProducer
from settings import *def is_suspicious(transaction: dict) -> bool:
"""Simple condition to determine whether a transaction is suspicious."""
return transaction[
"amount"] >= 900if __name__ == "__main__":
consumer = KafkaConsumer(
TRANSACTIONS_TOPIC
,bootstrap_servers=KAFKA_BROKER_URL
,value_deserializer = lambda value: json.loads(value)
,
)for message in consumer:
transaction: dict = message.value
topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
print(topic,transaction) #DEBUG
为简单起见,我选择了一个
is_suspicious()
基于简单谓词的基本条件函数(即如果交易金额大于等于900,则为可疑)。但是,在现实生活场景中,可能会涉及到很多参数,其中:- 无论是活动的、非活动的还是休眠的,发起方都负责交易。
- 交易的位置,如果是从一个在锁定期间应该关闭的实体发起的。
现在让我们测试
producer.py
和detector.py
,打开一个新的终端窗口并输入以下内容:$ python producer.py
同时,打开另一个窗口并输入:
$ python detector.py
注意:在执行测试之前,请确保 Kafka 服务器正在运行。
你将在 上看到与此类似的输出
detector.py
:文章图片
Python如何检测欺诈交易?程序中包含的调试打印会将交易输出到控制台,并根据我们指定的条件和相关的交易金额指示目标队列。
- 版本 2
appdetector.py
:这是检测器.py 脚本的高级版本,它将使用 Flask 微框架将交易流式传输到网络。
- Flask:Python 中的微型 Web 框架。
- 服务器发送事件 (SSE):一种服务器推送机制,其中客户端订阅服务器生成的更新流,并且每当发生新事件时,都会向客户端发送通知。
- Jinja2:在 Python 生态系统中广泛使用的现代模板引擎。Flask 默认支持 Jinja2。
appdetector.py
:from flask import Flask, Response, stream_with_context, render_template, json, url_forfrom kafka import KafkaConsumer
from settings import *# create the flask object app
app = Flask(__name__)def stream_template(template_name, **context):
print('template name =',template_name)
app.update_template_context(context)
t = app.jinja_env.get_template(template_name)
rv = t.stream(context)
rv.enable_buffering(5)
return rvdef is_suspicious(transaction: dict) -> bool:
"""Determine whether a transaction is suspicious."""
return transaction[
"amount"] >= 900# this router will render the template named index.html and will pass the following parameters to it:
# title and Kafka stream
@app.route('/')
def index():
def g():
consumer = KafkaConsumer(
TRANSACTIONS_TOPIC
, bootstrap_servers=KAFKA_BROKER_URL
, value_deserializer=lambda value: json.loads(value)
,
)
for message in consumer:
transaction: dict = message.value
topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
print(topic, transaction)# DEBUG
yield topic, transactionreturn Response(stream_template('index.html', title='Fraud Detector / Kafka',data=https://www.lsbin.com/g()))if __name__ =="__main__":
app.run(host="localhost" , debug=True)
接下来,我们将定义路由函数
index.html
使用的模板HTML 文件,该文件index()
位于templates
文件夹下:<
!doctype html>
<
title> Send Javascript with template demo <
/title>
<
html>
<
head>
<
/head>
<
body>
<
div class="container">
<
h1>{{title}}<
/h1>
<
/div>
<
div id="data"><
/div>
{% for topic, transaction in data: %}
<
script>
var topic = "{{ topic }}";
var transaction = "{{ transaction }}";
if (topic.search("fraud") > 0) {
topic = topic.fontcolor("red")
} else {
topic = topic.fontcolor("green")
}
document.getElementById('data').innerHTML += "<
br>" + topic + " " + transaction;
<
/script>
{% endfor %}
<
/body>
<
/html>
该
index.html
包含一个Javascript实现遍历整个接收到的数据流和他们收到以显示交易。现在运行它,确保
producer.py
正在运行,然后:$ python appdetector.py
Python Kafka检测流媒体欺诈交易:它应该在端口 5000 上启动一个本地服务器,转到你的浏览器并访问
http://localhost:5000
Flask 实例运行的位置,你将看到合法和欺诈交易的连续流,如下面的屏幕所示:文章图片
你可以在此处查看完整代码。
【在Python中使用Kafka检测流媒体应用程序中的欺诈交易】本Python Kafka检测欺诈交易教程说明了如何在欺诈检测应用程序中应用流处理范例。
推荐阅读
- Python使用Huggingface Transformers实现对话式AI聊天机器人
- 使用Python中的Imblearn和Smote变体库进行不平衡学习
- 如何在Python中使用Transformers从头开始??训练BERT()
- 如何在Python中使用Plotly创建绘图(完整指南)
- 一键u盘装xp系统,教您如何运用U盘装xp系统
- u盘是谁发明的,教您你一些不知道的事
- 超微主板怎样设置u盘打开,教您超微主板怎样设置u盘打开
- 微星主板怎样设置u盘打开,教您微星主板怎样设置u盘打开
- 教你win7打印机共享怎样设置