LOADING...

加载过慢请开启缓存(浏览器默认开启)

loading

P-luminary

Jenkins

2025/6/15

持续集成及JenKins介绍

配套原版资料Jenkins持续集成从入门到精通.pdf

软件开发生命周期【需求分析、设计、实现、测试、进化

软件开发瀑布模型

瀑布模型是最著名和最常使用的软件开发模型。瀑布模型就是一系列的软件开发过程。它是由制造业繁 衍出来的。一个高度化的结构流程在一个方向上流动,有点像生产线一样。在瀑布模型创建之初,没有 其它开发的模型,有很多东西全靠开发人员去猜测,去开发。这样的模型仅适用于那些简单的软件开 发, 但是已经不适合现在的开发了。

优势 劣势
简单易用和理解 各个阶段的划分完全固定,阶段之间产生大量的文档,极大地 增加了工作量。
当前一阶段完成后,您只需要 去关注后续阶段 由于开发模型是线性的,用户只有等到整个过程的末期才能见 到开发成果,从而增加了开发风险
为项目提供了按阶段划分的检 查节点 瀑布模型的突出缺点是不适应用户需求的变化

软件开发敏捷开发模型

敏捷开发(Agile Development) 的核心是迭代开发(Iterative Development) 与 增量开发 (Incremental Development) 。

==何为迭代开发?== 对于大型软件项目,传统的开发方式是采用一个大周期(比如一年)进行开发,整个过程就是一次”大 开发”;迭代开发的方式则不一样,它将开发过程拆分成多个小周期,即一次”大开发”变成多次”小开 发”,每次小开发都是同样的流程,所以看上去就好像重复在做同样的步骤。 举例来说,SpaceX 公司想造一个大推力火箭,将人类送到火星。但是,它不是一开始就造大火箭,而 是先造一个最简陋的小火箭 Falcon 1。结果,第一次发射就爆炸了,直到第四次发射,才成功进入轨 道。然后,开发了中型火箭 Falcon 9,九年中发射了70次。最后,才开发 Falcon 重型火箭。如果 SpaceX 不采用迭代开发,它可能直到现在还无法上天。

==何为增量开发?== 软件的每个版本,都会新增一个用户可以感知的完整功能。也就是说,按照新增功能来划分迭代。 举例来说,房产公司开发一个10栋楼的小区。如果采用增量开发的模式,该公司第一个迭代就是交付一 号楼,第二个迭代交付二号楼……每个迭代都是完成一栋完整的楼。而不是第一个迭代挖好10栋楼的地 基,第二个迭代建好每栋楼的骨架,第三个迭代架设屋顶…..

敏捷开发如何迭代?
虽然敏捷开发将软件开发分成多个迭代,但是也要求,每次迭代都是一个完整的软件开发周期,必须按 照软件工程的方法论,进行正规的流程管理。

敏捷开发有什么好处?

==早期交付== 敏捷开发的第一个好处,就是早期交付,从而大大降低成本。 还是以上一节的房产公司为例,如果按照 传统的”瀑布开发模式”,先挖10栋楼的地基、再盖骨架、然后架设屋顶,每个阶段都等到前一个阶段完 成后开始,可能需要两年才能一次性交付10栋楼。也就是说,如果不考虑预售,该项目必须等到两年后 才能回款。 敏捷开发是六个月后交付一号楼,后面每两个月交付一栋楼。因此,半年就能回款10%,后 面每个月都会有现金流,资金压力就大大减轻了。

==降低风险== 敏捷开发的第二个好处是,及时了解市场需求,降低产品不适用的风险。 请想一想,哪一种情况损失比 较小:10栋楼都造好以后,才发现卖不出去,还是造好第一栋楼,就发现卖不出去,从而改进或停建后面9栋楼

持续集成

持续集成( Continuous integration , 简称 CI )指的是,频繁地(一天多次)将代码集成到主干。 持续集成的目的,就是让产品可以快速迭代,同时还能保持高质量。它的核心措施是,代码集成到主干 之前,必须通过自动化测试。只要有一个测试用例失败,就不能集成。 通过持续集成, 团队可以快速的从一个功能到另一个功能,简而言之,敏捷软件开发很大一部分都要归 功于持续集成

持续集成的流程
提交

流程的第一步,是开发者向代码仓库提交代码。所有后面的步骤都始于本地代码的一次提交 (commit)。

测试(第一轮)

代码仓库对commit操作配置了钩子(hook),只要提交代码或者合并进主干,就会跑自动化测试。

构建

通过第一轮测试,代码就可以合并进主干,就算可以交付了。 交付后,就先进行构建(build),再进入第二轮测试。所谓构建,指的是将源码转换为可以运行的实 际代码,比如安装依赖,配置各种资源(样式表、JS脚本、图片)等等。

测试(第二轮)

构建完成,就要进行第二轮测试。如果第一轮已经涵盖了所有测试内容,第二轮可以省略,当然,这时 构建步骤也要移到第一轮测试前面。

部署

过了第二轮测试,当前代码就是一个可以直接部署的版本(artifact)。将这个版本的所有文件打包( tar filename.tar * )存档,发到生产服务器。

回滚

一旦当前版本发生问题,就要回滚到上一个版本的构建结果。最简单的做法就是修改一下符号链接,指 向上一个版本的目录

持续集成的组成要素

  • 一个自动构建过程, 从检出代码、 编译构建、 运行测试、 结果记录、 测试统计等都是自动完成 的,无需人工干预。
  • 一个代码存储库,即需要版本控制软件来保障代码的可维护性,同时作为构建过程的素材库,一般使用SVN或Git。
  • 一个持续集成服务器,Jenkins 就是一个配置简单和使用方便的持续集成服务器

持续集成的好处

1、降低风险,由于持续集成不断去构建,编译和测试,可以很早期发现问题,所以修复的代价就少;
2、对系统健康持续检查,减少发布风险带来的问题;
3、减少重复性工作;
4、持续部署,提供可部署单元包;
5、持续交付可供使用的版本;
6、增强团队信心

JenKins介绍

Jenkins 是一款流行的开源持续集成(Continuous Integration)工具,广泛用于项目开发,具有自动化构建、测试和部署等功能。官网: http://jenkins-ci.org/
CI:持续集成(Continuous Integration)
CD:持续部署(Continuous ????)

Jenkins的特征:

  • 开源的 Java语言开发持续集成工具,支持持续集成,持续部署。
  • 易于安装部署配置:可通过 方便web界面配置管理。
  • 消息通知及测试报告:集成 yum安装,或下载war包以及通过docker容器等快速实现安装部署,可 RSS/E-mail通过RSS发布构建结果或当构建完成时通过e-mail通知,生成JUnit/TestNG测试报告。 Jenkins能够让多台计算机一起构建/测试。
  • 分布式构建:支持
  • 文件识别: Jenkins能够跟踪哪次构建生成哪些jar,哪次构建使用哪个版本的jar等。
  • 丰富的插件支持:支持扩展插件,你可以开发适合自己团队使用的工具,如 docker等

持续集成流程说明

服务器列表[统一使用CentOS7]

名称 IP地址 安装的软件
代码托管服务器 192.168.200.128 Gitlab-12.4.2
持续集成服务器 192.168.200.129 Jenkins-2.190.3,JDK1.8,Maven3.6.2,Git, SonarQube
应用测试服务器 192.168.66.102 JDK1.8,Tomcat8.5

Gitlab[团队个人版github]代码托管服务器安装

[lanyun_group / web_demo · GitLab] (http://192.168.200.128:82/lanyun_group/web_demo)

官网: https://about.gitlab.com/
GitLab 是一个用于仓库管理系统的开源项目,使用Git作为代码管理工具,并在此基础上搭建起来的 web服务。

GitLab和GitHub一样属于第三方基于Git开发的作品,免费且开源(基于MIT协议),与Github类似, 可以注册用户,任意提交你的代码,添加SSHKey等等。不同的是,GitLab是可以部署到自己的服务器 上,数据库等一切信息都掌握在自己手上,适合团队内部协作开发,你总不可能把团队内部的智慧总放 在别人的服务器上吧?简单来说可把GitLab看作个人版的GitHub

  • 安装相关依赖

    yum -y install policycoreutils openssh-server openssh-clients postfix

  • 启动ssh服务&设置为开机启动

    systemctl enable sshd && sudo systemctl start sshd

  • 设置postfix开机自启,并启动,postfix支持gitlab发信功能

    systemctl enable postfix && systemctl start postfix

  • 开放ssh以及http服务,然后重新加载防火墙列表

    firewall-cmd –add-service=ssh –permanent
    firewall-cmd –add-service=http –permanent
    firewall-cmd –reload

    如果关闭防火墙就不需要做以上配置

  • 下载gitlab包,并且安装

    在线下载安装包
    wget https://mirrors.tuna.tsinghua.edu.cn/gitlab-ce/yum/el6/gitlab-ce-12.4.2-ce.0.el6.x 86_64.rpm 安装
    rpm -i gitlab-ce-12.4.2-ce.0.el6.x86_64.rpm

  • 修改gitlab配置

    vi /etc/gitlab/gitlab.rb

    修改gitlab访问地址和端口,默认为80,我们改为82
    external_url ‘ http://192.168.200.128:82'
    nginx[‘listen_port’] = 82

  • 重载配置及启动gitlab

    gitlab-ctl reconfigure
    gitlab-ctl restart ★★

  • 把端口添加到防火墙

firewall-cmd –zone=public –add-port=82/tcp –permanent
firewall-cmd –reload

启动成功后,看到以下修改管理员root密码的页面,修改密码后,然后登录即可

账号:root
密码:panchunyao123

Gitlab用户在组里面有5种不同权限:

Guest:可以创建issue、发表评论,不能读写版本库
Reporter:可以克隆代码,不能提交,QA、PM 可以赋予这个权限
Developer:可以克隆代码、开发、提交、push,普通开发可以赋予这个权限
Maintainer:可以创建项目、添加tag、保护分支、添加项目成员、编辑项目,核心开发可以赋予这个 权限 Owner:可以设置项目访问权限 - Visibility Level、删除项目、迁移项目、管理组成员,开发组组 长可以赋予这个权限

如果张三被管理员添加了Owner权限,那么张三就可以在idea里面通过gitlab里面的仓库地址 上传项目到这个web_demo了。

持续集成环境—Jenkins安装【我用了最新的Jenkins 2.440.1】

[Setup Wizard [Jenkins]] (http://192.168.200.129:8888/)

  • 安装JDK Jenkins需要依赖JDK,所以先安装JDK1.8

    yum install java-1.8.0-openjdk* -y

​ 安装目录为:/usr/lib/jvm

  • 获取jenkins安装包

下载页面: https://jenkins.io/zh/download/
安装文件:jenkins-2.190.3-1.1.noarch.rpm

  • 把安装包上传到192.168.66.101服务器,进行安装

    rpm -ivh jenkins-2.190.3-1.1.noarch.rpm

  • 进入文件目录

    cd /etc/sysconfig

  • 修改Jenkins配置

    vi jenkins

修改内容如下:

JENKINS_USER=”root”
JENKINS_PORT=”8888”

  • 启动Jenkins

    systemctl start jenkins ★★

    [root@localhost sysconfig]# systemctl start jenkins

  • 打开浏览器访问
    [192.168.200.129:8888] (http://192.168.200.129:8888/)
    注意:本服务器把防火墙关闭了,如果开启防火墙,需要在防火墙添加端口

  • 获取并输入admin账户密码

    [root@localhost sysconfig]# cat /var/lib/jenkins/secrets/initialAdminPassword

    [root@localhost sysconfig]# cat /root/.jenkins/secrets/initialAdminPassword
    3d83c355f83d4942847d4390fc5dcc39

  • 跳过插件安装 [点击第二个选择 然后上面全无]
    因为Jenkins插件需要连接默认官网下载,速度非常慢,而且经过会失败,所以我们暂时先跳过插件安装

综上操作

[root@localhost ~]# java -version
java version “21.0.1” 2023-10-17 LTS
Java(TM) SE Runtime Environment (build 21.0.1+12-LTS-29)
Java HotSpot(TM) 64-Bit Server VM (build 21.0.1+12-LTS-29, mixed mode, sharing)
[root@localhost ~]# cd /usr/lib/jvm
[root@localhost jvm]# ll
总用量 0
lrwxrwxrwx. 1 root root 26 6月 15 16:27 java -> /etc/alternatives/java_sdk
lrwxrwxrwx. 1 root root 32 6月 15 16:27 java-1.8.0 -> /etc/alternatives/java_sdk_1.8.0
lrwxrwxrwx. 1 root root 40 6月 15 16:27 java-1.8.0-openjdk -> /etc/alternatives/java_sdk_1.8.0_openjdk
drwxr-xr-x. 9 root root 109 6月 15 16:27 java-1.8.0-openjdk-1.8.0.262.b10-1.el7.x86_64
lrwxrwxrwx. 1 root root 34 6月 15 16:27 java-openjdk -> /etc/alternatives/java_sdk_openjdk
lrwxrwxrwx. 1 root root 21 6月 15 16:27 jre -> /etc/alternatives/jre
lrwxrwxrwx. 1 root root 27 6月 15 16:27 jre-1.8.0 -> /etc/alternatives/jre_1.8.0
lrwxrwxrwx. 1 root root 35 6月 15 16:27 jre-1.8.0-openjdk -> /etc/alternatives/jre_1.8.0_openjdk
lrwxrwxrwx. 1 root root 49 6月 15 16:27 jre-1.8.0-openjdk-1.8.0.262.b10-1.el7.x86_64 -> java-1.8.0-openjdk-1.8.0.262.b10-1.el7.x86_64/jre
lrwxrwxrwx. 1 root root 29 6月 15 16:27 jre-openjdk -> /etc/alternatives/jre_openjdk
[root@localhost jvm]# cd java-1.8.0-openjdk
[root@localhost java-1.8.0-openjdk]#

下一步:你只需要执行 升级Jenkins的升级流程

以下是你的专属升级操作指令合集(CentOS 7 专用):


🔧 1. 备份 Jenkins 数据(可选但强烈推荐)

sudo systemctl stop jenkins
sudo cp -r /var/lib/jenkins /var/lib/jenkins_backup_$(date +%F)
sudo cp /etc/sysconfig/jenkins /etc/sysconfig/jenkins.bak

🔧 2. 添加 Jenkins 官方 LTS Yum 源

sudo curl --silent --location https://pkg.jenkins.io/redhat-stable/jenkins.repo | sudo tee /etc/yum.repos.d/jenkins.repo > /dev/null
sudo rpm --import https://pkg.jenkins.io/redhat-stable/jenkins.io-2023.key

🔧 3. 升级 Jenkins

sudo yum clean all
sudo yum makecache
sudo yum upgrade jenkins -y

如果提示没有可升级的版本,执行以下命令强制重新安装最新版 Jenkins:

sudo yum remove jenkins -y
sudo yum install jenkins -y

⚠️ 不会丢失数据,因为 Jenkins 配置、任务、插件都在 /var/lib/jenkins 中,你已经备份过。


🔧 4. 启动 Jenkins 并查看状态

sudo systemctl daemon-reexec
sudo systemctl start jenkins
sudo systemctl status jenkins

🔧 5. 访问 Jenkins 页面

访问地址:

http://<你的服务器IP>:8888

🔧 6. 查看 Jenkins 新版本号

jenkins --version

或者登录网页右下角查看,应该是类似:

Jenkins ver. 2.440.1

查看 Jenkins 服务状态

请运行以下命令获取具体错误:

sudo systemctl status jenkins -l

步骤 3:重启 Jenkins 服务

sudo systemctl daemon-reexec
sudo systemctl restart jenkins
sudo systemctl status jenkins -l

🔗 插件下载地址与命令

以下命令会下载所有缺失依赖:

bash复制编辑# 进入 Jenkins 插件目录
cd /var/lib/jenkins/plugins

# 下载缺失的依赖插件
wget https://updates.jenkins.io/download/plugins/commons-lang3-api/3.13.0-62.v7d18e55f51e2/commons-lang3-api.hpi
wget https://updates.jenkins.io/download/plugins/ionicons-api/56.v1b_1c8c49374e/ionicons-api.hpi
wget https://updates.jenkins.io/download/plugins/caffeine-api/3.1.8-133.v17b_1ff2e0599/caffeine-api.hpi

持续集成环境—Jenkins插件管理【Manage Jenkins】

Jenkins本身不提供很多功能,我们可以通过使用插件来满足我们的使用。例如从Gitlab拉取代码,使用 Maven构建项目等功能需要依靠插件完成。接下来演示如何下载插件。

修改Jenkins插件下载地址

Jenkins国外官方插件地址下载速度非常慢,所以可以修改为国内插件地址:
Jenkins->Manage Jenkins->Manage Plugins,点击Available
新版本:Jenkins → Manage Jenkins → Plugins → Avaliable plugins

去Jenkins默认的开发目录

这样做是为了把 Jenkins官方的插件列表下载到本地,接着修改地址文件,替换为国内插件地址

[root@localhost sysconfig]# cd /var/lib/jenkins/
[root@localhost jenkins]# cd updates/
[root@localhost updates]# ll
总用量 3064
-rw-r–r–. 1 root root 3125621 6月 15 16:38 default.json
-rw-r–r–. 1 root root 7976 6月 15 16:38 hudson.tasks.Maven.MavenInstaller

sed -i ‘s/http://updates.jenkins ci.org/download/https://mirrors.tuna.tsinghua.edu.cn/jenkins/g’ default.json && sed -i ‘s/http:// www.google.com/https:\/\/ www.baidu.com/g' default.json

最后,Manage Plugins点击Advanced,把Update Site改为国内插件下载地址

https://mirrors.tuna.tsinghua.edu.cn/jenkins/updates/update-center.json

http://192.168.200.129:8888/restart 重启Jenkins

下载中文汉化插件

http://192.168.66.101:8888/restart ,重启Jenkins。 Jenkins->Manage Jenkins->Manage Plugins,点击Available,搜索”Chinese”

开启权限全局安全配置

在Security中的授权策略切换为 “Role-Based Strategy”,保存

【从插件市场上下载下来的 可以直接通过MobaXterm 放在/var/lib/jenkins/plugins/ 然后重启Jenkins】

创建角色

在系统管理页面进入 Manage and Assign Roles;点击”Manage Roles”

里面的Global roles(全局角色):管理员等高级用户可以创建基于全局的角色 Project roles(项目角色): 针对某个或者某些项目的角色 Slave roles(奴隶角色):节点相关的权限

我们添加以下三个角色:【一个基础角色 两个项目角色

  • baseRole :该角色为全局角色。这个角色需要绑定Overall下面的Read权限,是为了给所有用户绑 定最基本的Jenkins访问权限。注意:如果不给后续用户绑定这个角色,会报错误:用户名 is missing the Overall/Read permission
  • role1 :该角色为项目角色(下面的Item roles)。使用正则表达式绑定” itcast.* “,意思是只能操作itcast开头的项目。
  • role2 :该角色也为项目角色。绑定”itheima.*”,意思是只能操作itheima开头的项目。

创建用户

在系统管理页面进入 Manage Users
用户一:用户名:eric 密码:123456
用户二:用户名:JacK 密码:123456

给用户分配角色

系统管理页面进入Manage and Assign Roles,点击Assign Roles
绑定规则如下:

  • eric 用户分别绑定baseRole和role1角色
  • jack 用户分别绑定baseRole和role2角色

创建项目测试权限

以itcast管理员账户创建两个项目,分别为itcast01和itheima01

结果为:

  • eric 用户登录,只能看到itcast01项目
  • jack 用户登录,只能看到itheima01项目

持续集成环境—Jenkins凭证管理

凭据可以用来存储需要密文保护的数据库密码、Gitlab密码信息、Docker私有仓库密码等,以便 Jenkins可以和这些第三方的应用进行交互

安装Credentials Binding插件

要在Jenkins使用凭证管理功能,需要安装Credentials Binding插件
安装插件后,左边多了”凭证“菜单,在这里管理所有凭证 [新版是在Security栏有凭证管理]
进入凭据后点击Stores scoped to Jenkins的**全局**

可以添加的凭证有 5种:
  • Username with password :用户名和密码
  • SSH Username with private key: 使用SSH用户和密钥
  • Secret file:需要保密的文本文件,使用时Jenkins会将文件复制到一个临时目录中,再将文件路径 设置到一个变量中,等构建结束后,所复制的Secret file就会被删除。
  • Secret text :需要保存的一个加密的文本串,如钉钉机器人或Github的api token【k8s也会用】
  • Certificate :通过上传证书文件的方式
阅读全文

OpenCV

2025/3/30

图形相关处理

OpenCV是应用广泛的开源图像处理库,我们以其为基础,介绍相关的图像处理方法:包括基本的图像处理方法:几何变换,形态学变换,围伦平滑,直方围操作,板匹配,霍夫变换等;特征提取和描述方法:理解角点特征,Harris和Shi-TomaS算法,SIFT/SURF算法,Fast算法,ORB算法等;还有OpenCV在视频操作中的应用,最后的案例是使用OpenCV进行人险检测。

在第一次世界大战后,1921年美国科学家发明了BartlaneSystem,并从伦敦传到纽约传输了第一幅数字图像,其亮度用离散数值表示,:将图片编码成5个灰度级,如下图所示,通过海底电缆进行传输。在发送端图片被编码并使用打孔带记录,通过系统传输后在接收方使用特殊的打印机恢复成图像。

1950年左右,计算机被发明,数字图像处理学科正式诞生

位数

计算机采用0/1编码的系统,数字图像也是利用0/1来记录信息,我们平常接触的图像都是8位数图像,包含0~255灰度,其中0,代表最黑,1,表示最白。

二值图像:

一幅二值图像的二维矩阵仅由0、1两个值构成,“0”代表黑色,“1”代白色。由于每一像素(矩阵中每一元素)取值仅有0、1两种可能,所以计算机中二值图像的数据类型通常为1个二进制位。二值图像通常用于文字、线条图的扫描识别(OCR)和掩膜图像的存储,

灰度图:

每个像素只有一个采样颜色的图像,这类图像通常显示为从最暗黑色到最亮的白色的灰度,尽管理论上这个采样可以任何颜色的不同深浅,甚至可以是不同亮度上的不同颜色。灰度图像与黑白图像不同,在计算机图像领域中黑白图像只有黑色与白色两种颜色;但是,灰度图像在黑色与白色之间还有许多级的颜色深度。灰度图像经常是在单个电磁波频谱如可见光内测量每个像素的亮度得到的,用于显示的灰度图像通常用每个采样像素8位的非线性尺度来保存,这样可以有256级灰度(如果用16位,则有65536级)。

彩色图:

每个像素通常是由红(R)、绿(G)、蓝(B)三个分量来表示的,分量介干(0,255)。RGB图像与索引图像一样都可以用来表示彩色图像。与索引图像一样,它分别用红(R)、绿(G)、蓝(B)三原色的组合来表示每个像素的颜色。但与索引图像不同的是,RGB图像每一个像素的颜色值(由RGB三原色表示)直接存放在图像矩阵中,由于每一像素的颜色需由R、G、B三个分量来表示,M、N分别表示图像的行列数,三个MxN的二维矩阵分别表示各个像素的R、G、B三个颜色分量。RGB图像的数据类型一般为8位无符号整形,通常用于表示和存放真彩色图像。

1. 图像是什么

图:物体反射或透射光的分布
像:人的视觉系统所接受的图在人脑中所形版的印象或认识

2. 模拟图像和数字图像

模拟图像:连续存储的数据
数字图像:分级存储的数据

3. 数字图像

位数:图像的表示,常见的就是8位分类:二值图像,灰度图像和彩色图像

OpenCV的优势:
1. 编程语言

OpenCV基于C++实现,同时提供python,Ruby,Matlab等语言的接口。OpenCV-Python是OpenCV的Python API,结合了OpenCV C++ API和Python语言的最佳特性。

2. 跨平台

可以在不同的系统平台上使用,包括Windows,Linux,OSX,Android和i0S。基于CUDA和OpenCl的高速GPU操作接口也在积极开发中

3. 活跃的开发团队
4. 丰富的AP!

同时添加了对深度学习的支持。完善的传统计算机视觉算法,涵盖主流的机器学习算法,

OpenCV-Python

OpenCV-Python是一个Python绑定库,旨在解决计算机视觉问题。
Python是一种由Guido van Rossum开发的通用编程语言,它很快就变得非常流行,主要是因为它的简单性和代码可读性。它使程序员能够用更少的代码行表达思想,而不会降低可读性

与C/C++等语言相比,Python速度较慢。也就是说,Python可以使用C/C++轻松扩展,这使我们可以在C/C++中编写计算密集型代码,并创建可用作Python模块的Python包装器。这给我们带来了两个好处:首先,代码与原始C/C++代码一样快(因为它是在后台工作的实际C++代码),其次,在Python中编写代码比使用C/C++更容易。OpenCV-Python是原始OpenCVC++实现的Python包装器。

OpenCV-Python使用Numpy,这是一个高度优化的数据库操作库,具有MATLAB风格的语法。所有OpenCV数组结构都转换为Numpy数组。这也使得与使用Numpy的其他库(如SciPy和Matplotib)集成更容易。

OpenCV部署方法

安装OpenCV之前需要先安装numpy, matplotlibJ
创建Python虚拟环境cv,在cv中安装即可
先安装OpenCV-Python,由于一些经典的算法被申请了版权,新版本有很大的限制,所以选用3.4.3以下的版

OpenCV模块简介

其中core、highgui、imgproc是最基础的模块,该课程主要是围绕这几个模块展开的,分别介绍如下:

  • core模块实现了最核心的数据结构及其基本运算,如绘图函数、数组操作相关函数等。

  • highgui模块实现了视频与图像的读取、显示、存储等接口。

  • imgproc模块实现了图像处理的基础方法,包括图像滤波、图像的几何变换、平滑、阈值分割、形态学处理、边缘检测、目标检测、运动分析和对象跟踪等。

    对于图像处理其他更高层次的方向及应用,penCV也有相关的模块实现

  • features2d模块用于提取图像特征以及特征匹配,nonfree模块实现了一些专利算法,如sift特征

  • objdetect模块实现了一些目标检测的功能,经典的基于Haar、LBP特征的人脸检测,基于HOG的行人、汽车等目标检测,分类器使用Cascade Classification(级联分类)和Latent SVM等。

  • stitching模块实现了图像拼接功能。

  • FLANN模块(Fast Library for Approximate Nearest Neighbors),包含快速近似最近邻搜索FLANN和聚类Clustering算法。

  • mI模块机器学习模块(SVM,决策树,Boosting等等)

  • photo模块包含图像修复和图像去噪两部分。

  • video模块针对视频处理,如背景分离,前景检测、对象跟踪等。

  • calib3d模块即Calibration(校准)3D,这个模块主要是相机校准和三维重建相关的内容。包含了基本的多视角几何算法,单个立体摄像头标定,物体姿态估计,立体相似性算法,3D信息的重建等等。

  • G-API模块包含超高效的图像处理pipeline引整

OpenCV基本操作

本章主要介绍图像的基础操作,包括:

  • 图像的IO操作,读取和保存方法
  • 在图像上绘制几何图形
  • 怎么获取图像的属性
  • 怎么访问图像的像素,进行通道分离,合并等
  • 怎么实现颜色空间的变换
  • 图像的算术运算
图像的基础操作
  • 掌握图像的读取和保存方法
  • 能够使用OpenCV在图像上绘制几何图形
  • 能够访问图像的像素
  • 能够获取图像的属性,并进行通道的分离和合并
  • 能够实现颜色空间的变换

图像的IO操作

这里我们会给大家介绍如何读取图像,如何显示图像和如何保存图像。

1.1 读取图像

  1. API
cv.imread()

参数:

  • 要读取的图像

  • 读取方式的标志

    • cv.IMREAD*COLOR:以彩色模式加载图像,任何图像的透明度都将被忽略。这是默认参数。

    • cv.IMREAD*GRAYSCALE:以灰度模式加载图像

    • cv.IMREAD_UNCHANGED:包括alpha通道的加载图像模式。

      可以使用1、0或者-1来替代上面三个标志

  • 参考代码

    import numpy as np
    import cv2 as cv
    # 以灰度图的形式读取图像
    img = cv.imread('messi5.jpg',0)
    

注意:如果加载的路径有错误,不会报错,会返回一个None值

1.2显示图像

1 . API

cv.imshow()

参数:

  • 显示图像的窗口名称,以字符串类型表示
  • 要加载的图像

注意:在调用显示图像的API后,要调用cv.waitKey()给图像绘制留下时间,否则窗口会出现无响应情况,并且图像无法显示出来

另外我们也可使用matplotlib对图像进行展示。

  1. 参考代码

    # opencv中显示
    cv.imshow('image',img)
    cv.waitKey(0) # 永远的等待下去
    # matplotlib中展示
    plt.imshow(img[:,:,::-1])
    

1.3 保存图像

  1. API

    cv.imwrite()
    

    参数:

    • 文件名,要保存在哪里
    • 要保存的图像
  2. 参考代码

    cv.imwrite('messigray.png',img)
    

1.4 总结

我们通过加载灰度图像,显示图像,如果按’s’并退出则保存图像,或者按ESC键直接退出而不保存。

import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt
# 1 读取图像
img = cv.imread('messi5.jpg',0)
# 2 显示图像
# 2.1 利用opencv展示图像
cv.imshow('image',img)
# 2.2 在matplotplotlib中展示图像【RGB通道】  后面要反转图像去显示
plt.imshow(img[:,:,::-1])
plt.title('匹配结果'), plt.xticks([]), plt.yticks([])
plt.show()
k = cv.waitKey(0)
# 3 保存图像
cv.imwrite('messigray.png',img)

2 绘制几何图形

2.1 绘制直线

cv.line(img,start,end,color,thickness)

参数:

  • img:要绘制直线的图像
  • Start,end: 直线的起点和终点
  • color: 线条的颜色
  • Thickness: 线条宽度

2.2 绘制圆形

cv.circle(img,centerpoint, r, color, thickness)

参数:

  • img:要绘制圆形的图像
  • Centerpoint, r: 圆心和半径
  • color: 线条的颜色
  • Thickness: 线条宽度,为-1时生成闭合图案并填充颜色

2.3 绘制矩形

cv.rectangle(img,leftupper,rightdown,color,thickness)

参数:

  • img:要绘制矩形的图像
  • Leftupper, rightdown: 矩形的左上角和右下角坐标
  • color: 线条的颜色
  • Thickness: 线条宽度

2.4 向图像中添加文字

cv.putText(img,text,station, font, fontsize,color,thickness,cv.LINE_AA)

参数:

  • img: 图像
  • text:要写入的文本数据
  • station:文本的放置位置
  • font:字体
  • Fontsize :字体大小

2.5 效果展示

我们生成一个全黑的图像,然后在里面绘制图像并添加文字

import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt
# 1 创建一个空白的图像
img = np.zeros((512,512,3), np.uint8)
# 2 绘制图形
cv.line(img,(0,0),(511,511),(255,0,0),5)
cv.rectangle(img,(384,0),(510,128),(0,255,0),3)
cv.circle(img,(447,63), 63, (0,0,255), -1)
font = cv.FONT_HERSHEY_SIMPLEX
cv.putText(img,'OpenCV',(10,500), font, 4,(255,255,255),2,cv.LINE_AA)
# 3 图像展示
plt.imshow(img[:,:,::-1])
plt.title('匹配结果'), plt.xticks([]), plt.yticks([])
plt.show()

3 获取并修改图像中的像素点

我们可以通过行和列的坐标值获取该像素点的像素值。对于BGR图像,它返回一个蓝,绿,红值的数组。对于灰度图像,仅返回相应的强度值。使用相同的方法对像素值进行修改。

import numpy as np
import cv2 as cv
img = cv.imread('messi5.jpg')
# 获取某个像素点的值
px = img[100,100]
# 仅获取蓝色通道的强度值
blue = img[100,100,0]
# 修改某个位置的像素值
img[100,100] = [255,255,255]

4 获取图像的属性

图像属性包括行数,列数和通道数,图像数据类型,像素数等。

5 图像通道的拆分与合并

有时需要在B,G,R通道图像上单独工作。在这种情况下,需要将BGR图像分割为单个通道。或者在其他情况下,可能需要将这些单独的通道合并到BGR图像。你可以通过以下方式完成。

# 通道拆分
b,g,r = cv.split(img)
# 通道合并
img = cv.merge((b,g,r))

6 色彩空间的改变

OpenCV中有150多种颜色空间转换方法。最广泛使用的转换方法有两种,BGR↔Gray和BGR↔HSV。

API:

cv.cvtColor(input_image,flag)

参数:

  • input_image: 进行颜色空间转换的图像
  • flag: 转换类型
    • cv.COLOR_BGR2GRAY : BGR↔Gray
    • cv.COLOR_BGR2HSV: BGR→HSV

总结:

  1. 图像IO操作的API:

    cv.imread(): 读取图像

    cv.imshow():显示图像

    cv.imwrite(): 保存图像

  2. 在图像上绘制几何图像

    cv.line(): 绘制直线

    cv.circle(): 绘制圆形

    cv.rectangle(): 绘制矩形

    cv.putText(): 在图像上添加文字

  3. 直接使用行列索引获取图像中的像素并进行修改

  4. 图像的属性

    属性 API
    形状 img.shape
    图像大小 img.size
    数据类型 img.dtype
  5. 拆分通道:cv.split()

    通道合并:cv.merge()

  6. 色彩空间的改变

    cv.cvtColor(input_image,flag)

★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★

Jupyter Notebook增加临时环境

在Jupyter的terminal中

★ ★ PS C:\Users\Pluminary\Documents> pip config set global.index-url https://pypi.tuna.tsi
nghua.edu.cn/simple
Writing to C:\Users\Pluminary\AppData\Roaming\pip\pip.ini
★ ★ PS C:\Users\Pluminary\Documents> pip install opencv-python --cache-dir ./pipcache
>>
Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple, https://pypi.ngc.nvidia.
com
Collecting opencv-python
  Downloading https://pypi.tuna.tsinghua.edu.cn/packages/a4/7d/f1c30a92854540bf789e9cd
5dde7ef49bbe63f855b85a2e6b3db8135c591/opencv_python-4.11.0.86-cp37-abi3-win_amd64.whl
(39.5 MB)
     --------------------------------------- 39.5/39.5 MB 1.3 MB/s eta 0:00:00
★ ★ PS C:\Users\Pluminary\Documents> pip install matplotlib

★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★

算数操作【要用到上面的Jupyter Notebook】

学习目标

  • 了解图像的加法、混合操作

1.图像的加法注意:这里都要求两幅图像是相同大小的

你可以使用OpenCV的cv.add()函数把两幅图像相加,或者可以简单地通过numpy操作添加两个图像,如res = img1 + img2。两个图像应该具有相同的大小和类型,或者第二个图像可以是标量值。

注意:OpenCV加法和Numpy加法之间存在差异。OpenCV的加法是饱和操作,而Numpy添加是模运算

参考以下代码:

【OpenCV】
>>> x = np.uint8([250])
>>> y = np.uint8([10])
>>> print( cv.add(x,y) ) # 250+10 = 260 => 255
[[255]]

【Numpy】
>>> print( x+y )          # 250+10 = 260 % 256 = 4
[4]

这种差别在你对两幅图像进行加法时会更加明显。OpenCV 的结果会更好一点。所以我们尽量使用 OpenCV 中的函数。

代码:

import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt

# 1 读取图像
img1 = cv.imread("view.jpg")
img2 = cv.imread("rain.jpg")

# 2 加法操作
img3 = cv.add(img1,img2) # cv中的加法
img4 = img1+img2 # 直接相加

# 3 图像显示
fig,axes=plt.subplots(nrows=1,ncols=2,figsize=(10,8),dpi=100)
axes[0].imshow(img3[:,:,::-1])
axes[0].set_title("cv中的加法")
axes[1].imshow(img4[:,:,::-1])
axes[1].set_title("直接相加")
plt.show()
import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt

# 显示下雨的图片 ("./image/view.jpg")
rain = cv.imread("./rain.jpg")
plt.imshow(rain[:,:,::-1])

# 显示风景图片
view = cv.imread("./view.jpg")
plt.imshow(view[:,:,::-1])

# 用cv去做加法 显示雨中风景图
img1 = cv.add(rain,view)
plt.imshow(img1[:,:,::-1])

# 用普通做加法 显示雨中风景图
img2 = rain+view
plt.imshow(img2[:,:,::-1])

总结:OpenCV的加法效果要大于Numpy的加法效果

2.图像的混合注意:这里都要求两幅图像是相同大小的

这其实也是加法,但是不同的是两幅图像的权重不同,这就会给人一种混合或者透明的感觉。图像混合的计算公式如下:

g(x) = (1−α)f0(x) + αf1(x)

通过修改 α 的值(0 → 1),可以实现非常炫酷的混合。

现在我们把两幅图混合在一起。第一幅图的权重是0.7,第二幅图的权重是0.3。函数cv2.addWeighted()可以按下面的公式对图片进行混合操作。

dst = α⋅img1 + β⋅img2 + γ

这里γ取为零。

参考以下代码:

import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt

# 1 读取图像
img1 = cv.imread("view.jpg")
img2 = cv.imread("rain.jpg")

# 2 图像混合
img3 = cv.addWeighted(img1,0.7,img2,0.3,0)

# 3 图像显示
plt.figure(figsize=(8,8))
plt.imshow(img3[:,:,::-1])
plt.show()
## 结合上方的代码
import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt

# 显示下雨的图片
rain = cv.imread("./rain.jpg")
plt.imshow(rain[:,:,::-1])

# 显示风景图片
view = cv.imread("./view.jpg")
plt.imshow(view[:,:,::-1])

# 用cv去做加法 显示雨中风景图
img1 = cv.add(rain,view)
plt.imshow(img1[:,:,::-1])

# 用普通做加法 显示雨中风景图
img2 = rain+view
plt.imshow(img2[:,:,::-1])

# 做指定参数的混合加法
img3 = cv.addWeighted(view,0.5,rain,0.5,0)
plt.imshow(img3[:,:,::-1])

总结

  1. 图像加法:将两幅图像加载一起

    cv.add()

  2. 图像的混合:将两幅图像按照不同的比例进行混合

    cv.addweight()

注意:这里都要求两幅图像是相同大小的。

1 图像缩放[绝对尺寸+相对尺寸]

缩放是对图像的大小进行调整,即使图像放大或缩小。

  1. lAPI

    cv2.resize(src,dsize,fx=0,fy=0,interpolation=cv2.INTER_LINEAR)
    

    参数:

    • src : 输入图像

    • dsize: 绝对尺寸,直接指定调整后图像的大小

    • fx,fy: 相对尺寸,将dsize设置为None,然后将fx和fy设置为比例因子即可

    • interpolation:插值方法,

      插值 含义
      cv2.INTER_LINEAR 双线性插值法
      cv2.INTER_NEAREST 最近邻插值
      cv2.INTER_AREA 像素
      cv2.INTER_CUBIC 双三次插值
  2. 示例

  3. import cv2 as cv
    # 1. 读取图片
    img1 = cv.imread("./image/dog.jpeg")
    # 2.图像缩放
    # 2.1 绝对尺寸
    rows,cols = img1.shape[:2]
    res = cv.resize(img1,(2*cols,2*rows),interpolation=cv.INTER_CUBIC)
    
    # 2.2 相对尺寸
    res1 = cv.resize(img1,None,fx=0.5,fy=0.5)
    
    # 3 图像显示
    # 3.1 使用opencv显示图像(不推荐)
    cv.imshow("orignal",img1)
    cv.imshow("enlarge",res)
    cv.imshow("shrink)",res1)
    cv.waitKey(0)
    
    # 3.2 使用matplotlib显示图像
    fig,axes=plt.subplots(nrows=1,ncols=3,figsize=(10,8),dpi=100)
    axes[0].imshow(res[:,:,::-1])
    axes[0].set_title("绝对尺度(放大)")
    axes[1].imshow(img1[:,:,::-1])
    axes[1].set_title("原图")
    axes[2].imshow(res1[:,:,::-1])
    axes[2].set_title("相对尺度(缩小)")
    plt.show()
    
import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt
kids = cv.imread("./kids.jpg")
plt.imshow(kids[:,:,::-1])

# 绝对尺寸
rows,cols = kids.shape[:2]
rows =>> 374
cols =>> 500
res = cv.resize(kids,[2*rows,2*cols])
plt.imshow(res[:,:,::-1])
res.shape =>> (1000,748,3)

# 相对尺寸
res1 = cv.resize(kids,None,fx=0.5,fy=0.5)
plt.imshow(res1[:,:,::-1])
res1.shape =>> (187,250,3)

2 图像平移

图像平移将图像按照指定方向和距离,移动到相应的位置。

  1. API
cv.warpAffine(img,M,dsize)

参数:

  • img: 输入图像

  • M: 2∗∗3移动矩阵

    对于(x,y)处的像素点,要把它移动到(x+tx,y+tyx+t**x,y+t**y)处时,M矩阵应如下设置:

    M = [1 0 tx]
    [0 1 ty]
    注意:将MM设置为np.float32类型的Numpy数组。

  • dsize: 输出图像的大小

    注意:输出图像的大小,它应该是(宽度,高度)的形式。请记住,width=列数,height=行数。

  • 示例

需求是将图像的像素点移动(50,100)的距离:

import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt
# 1. 读取图像
img1 = cv.imread("./image/image2.jpg")

# 2. 图像平移
rows,cols = img1.shape[:2]
M = M = np.float32([[1,0,100],[0,1,50]])# 平移矩阵
dst = cv.warpAffine(img1,M,(cols,rows))

# 3. 图像显示
fig,axes=plt.subplots(nrows=1,ncols=2,figsize=(10,8),dpi=100)
axes[0].imshow(img1[:,:,::-1])
axes[0].set_title("原图")
axes[1].imshow(dst[:,:,::-1])
axes[1].set_title("平移后结果")
plt.show()

3 图像旋转[每一个像素点坐标进行修改]

图像旋转是指图像按照某个位置转动一定角度的过程,旋转中图像仍保持这原始尺寸。图像旋转后图像的水平对称轴、垂直对称轴及中心坐标原点都可能会发生变换,因此需要对图像旋转中的坐标进行相应转换。

同时我们要修正原点的位置,因为原图像中的坐标原点在图像的左上角,经过旋转后图像的大小会有所变化,原点也需要修正。

假设在旋转的时候是以旋转中心为坐标原点的,旋转结束后还需要将坐标原点移到图像左上角,也就是还要进行一次变换。

在OpenCV中图像旋转首先根据旋转角度和旋转中心获取旋转矩阵,然后根据旋转矩阵进行变换,即可实现任意角度和任意中心的旋转效果。
  1. API

    cv2.getRotationMatrix2D(center, angle, scale)
    

    参数:

    • center:旋转中心
    • angle:旋转角度
    • scale:缩放比例

    返回:

    • M:旋转矩阵

      调用cv.warpAffine完成图像的旋转

  2. 示例

    import numpy as np
    import cv2 as cv
    import matplotlib.pyplot as plt
    # 1 读取图像
    img = cv.imread("./image/image2.jpg")
    
    # 2 图像旋转
    rows,cols = img.shape[:2]
    # 2.1 生成旋转矩阵
    M = cv.getRotationMatrix2D((cols/2,rows/2),90,1)
    # 2.2 进行旋转变换
    dst = cv.warpAffine(img,M,(cols,rows))
    
    # 3 图像展示
    fig,axes=plt.subplots(nrows=1,ncols=2,figsize=(10,8),dpi=100)
    axes[0].imshow(img1[:,:,::-1])
    axes[0].set_title("原图")
    axes[1].imshow(dst[:,:,::-1])
    axes[1].set_title("旋转后结果")
    plt.show()
    
## 完整的Jupyter
import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt
kids = cv.imread("./kids.jpg")
plt.imshow(kids[:,:,::-1])

# 绝对尺寸
rows,cols = kids.shape[:2]
rows =>> 374
cols =>> 500
res = cv.resize(kids,[2*rows,2*cols])
plt.imshow(res[:,:,::-1])
res.shape

# 相对尺寸
res1 = cv.resize(kids,None,fx=0.5,fy=0.5)
plt.imshow(res1[:,:,::-1])
res1.shape

# 图像平移
rows,cols = kids.shape[:2]
M = np.float32([[1,0,100],[0,1,50]])
res2 = cv.warpAffine(kids,M,(cols,rows))
plt.imshow(res2[:,:,::-1])

# 图像旋转
M = cv.getRotationMatrix2D((cols/2,rows/2),45,1)
res3 = cv.warpAffine(kids,M,(cols,rows))
plt.imshow(res3[:,:,::-1])

4 仿射变换[类似ps的扭曲]

图像的仿射变换涉及到图像的形状位置角度的变化,是深度学习预处理中常到的功能,仿射变换主要是对图像的缩放,旋转,翻转和平移等操作的组合。

那什么是图像的仿射变换,如下图所示,图1中的点1, 2 和 3 与图二中三个点一一映射, 仍然形成三角形, 但形状已经大大改变,通过这样两组三点(感兴趣点)求出仿射变换, 接下来我们就能把仿射变换应用到图像中所有的点中,就完成了图像的仿射变换。

在OpenCV中,仿射变换的矩阵是一个2×3的矩阵:
M=[A B]=[a00 a01 b0]
=[a10 a11 b1]
其中左边的2×2子矩阵$A$是线性变换矩阵,右边的2×1子矩阵$B$是平移项:
A=[a00 a01] , B=[b0]
[a10 a11] [b1]
对于图像上的任一位置(x,y),仿射变换执行的是如下的操作:
Taffine = A[x] + B = M[x]
[y] [y]
[1]
需要注意的是,对于图像而言,宽度方向是x,高度方向是y,坐标的顺序和图像像素对应下标一致。所以原点的位置不是左下角而是右上角,y的方向也不是向上,而是向下。

在仿射变换中,原图中所有的平行线在结果图像中同样平行。为了创建这个矩阵我们需要从原图像中找到三个点以及他们在输出图像中的位置。然后cv2.getAffineTransform 会创建一个 2x3 的矩阵,最后这个矩阵会被传给函数 cv2.warpAffine。

import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt
# 1 图像读取
img = cv.imread("./image/image2.jpg")

# 2 仿射变换
rows,cols = img.shape[:2]
# 2.1 创建变换矩阵
pts1 = np.float32([[50,50],[200,50],[50,200]])
pts2 = np.float32([[100,100],[200,50],[100,250]])
M = cv.getAffineTransform(pts1,pts2)
# 2.2 完成仿射变换
dst = cv.warpAffine(img,M,(cols,rows))

# 3 图像显示
fig,axes=plt.subplots(nrows=1,ncols=2,figsize=(10,8),dpi=100)
axes[0].imshow(img[:,:,::-1])
axes[0].set_title("原图")
axes[1].imshow(dst[:,:,::-1])
axes[1].set_title("仿射后结果")
plt.show()
## 代码承接上方
# 仿射变换
pts1 = np.float32([[56,65],[368,52],[389,390]])
pts2 = np.float32([[100,145],[300,100],[310,300]])

M = cv.getAffineTransform(pts1,pts2)
    M array([[ 6.40600025e-01, -1.02147944e-02,  6.47903603e+01],
               [-1.19267183e-01,  5.99126068e-01,  1.12735768e+02]])
res4 = cv.warpAffine(kids,M,(cols,rows))
plt.imshow(res4[:,:,::-1])

5 透射变换

透射变换是视角变化的结果,是指利用透视中心、像点、目标点三点共线的条件,按透视旋转定律使承影面(透视面)绕迹线(透视轴)旋转某一角度,破坏原有的投影光线束,仍能保持承影面上投影几何图形不变的变换。

在opencv中,我们要找到四个点,其中任意三个不共线,然后获取变换矩阵T,再进行透射变换。通过函数cv.getPerspectiveTransform找到变换矩阵,将cv.warpPerspective应用于此3x3变换矩阵。

import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt
# 1 读取图像
img = cv.imread("./image/image2.jpg")
# 2 透射变换
rows,cols = img.shape[:2]
# 2.1 创建变换矩阵
pts1 = np.float32([[56,65],[368,52],[28,387],[389,390]])
pts2 = np.float32([[100,145],[300,100],[80,290],[310,300]])

T = cv.getPerspectiveTransform(pts1,pts2)
# 2.2 进行变换
dst = cv.warpPerspective(img,T,(cols,rows))

# 3 图像显示
fig,axes=plt.subplots(nrows=1,ncols=2,figsize=(10,8),dpi=100)
axes[0].imshow(img[:,:,::-1])
axes[0].set_title("原图")
axes[1].imshow(dst[:,:,::-1])
axes[1].set_title("透射后结果")
plt.show()
## 代码承接上方
# 投射变换
pst1 = np.float32([[56,65],[368,52],[28,387],[389,390]])
pst2 = np.float32([[100,145],[300,100],[80,290],[310,300]])
T = cv.getPerspectiveTransform(pst1,pst2)
T
res5 = cv.warpPerspective(kids,T,(cols,rows))
plt.imshow(res5[:,:,::-1])

6 图像金字塔

图像金字塔是图像多尺度表达的一种,最主要用于图像的分割,是一种以多分辨率来解释图像的有效但概念简单的结构。

图像金字塔用于机器视觉和图像压缩,一幅图像的金字塔是一系列以金字塔形状排列的分辨率逐步降低,且来源于同一张原始图的图像集合。其通过梯次向下采样获得,直到达到某个终止条件才停止采样。

金字塔的底部待处理图像的高分辨率表示,而顶部低分辨率的近似,层级越高,图像越小,分辨率越低

  1. API

    cv.pyrUp(img)       #对图像进行上采样
    cv.pyrDown(img)        #对图像进行下采样
    
  2. 示例

    import numpy as np
    import cv2 as cv
    import matplotlib.pyplot as plt
    # 1 图像读取
    img = cv.imread("./image/image2.jpg")
    # 2 进行图像采样
    up_img = cv.pyrUp(img)  # 上采样操作
    img_1 = cv.pyrDown(img)  # 下采样操作
    # 3 图像显示
    cv.imshow('enlarge', up_img)
    cv.imshow('original', img)
    cv.imshow('shrink', img_1)
    cv.waitKey(0)
    cv.destroyAllWindows()
    
## 代码承接上方
# 图像金字塔
 # 图像上采样
plt.imshow(kids[:,:,::-1])
imgup = cv.pyrUp(kids)
plt.imshow(imgup[:,:,::-1])
imgup2 = cv.pyrUp(imgup)
plt.imshow(imgup2[:,:,::-1])
 # 图像下采样
imgdown = cv.pyrDown(kids)
plt.imshow(imgdown[:,:,::-1])

总结

  1. 图像缩放:对图像进行放大或缩小

    cv.resize()

  2. 图像平移:

    指定平移矩阵后,调用cv.warpAffine()平移图像

  3. 图像旋转:

    调用cv.getRotationMatrix2D获取旋转矩阵,然后调用cv.warpAffine()进行旋转

  4. 仿射变换:

    调用cv.getAffineTransform将创建变换矩阵,最后该矩阵将传递给cv.warpAffine()进行变换

  5. 透射变换:

    通过函数cv.getPerspectiveTransform()找到变换矩阵,将cv.warpPerspective()进行投射变换

  6. 金字塔

    图像金字塔是图像多尺度表达的一种,使用的API:

    cv.pyrUp(): 向上采样

    cv.pyrDown(): 向下采样

形态学操作

学习目标

  • 理解图像的邻域,连通性
  • 了解不同的形态学操作:腐蚀,膨胀,开闭运算,礼帽和黑帽等,及其不同操作之间的关系

形态学转换是基于图像形状的一些简单操作。它通常在二进制图像上执行。腐蚀和膨胀是两个基本的形态学运算符。然后它的变体形式如开运算,闭运算,礼帽黑帽等。

膨胀的作用是将与物体接触的所有背景点合并到物体中,使目标增大,可添补目标中的孔洞。

API

   cv.dilate(img,kernel,iterations)

参数:

  • img: 要处理的图像
  • kernel: 核结构
  • iterations: 腐蚀的次数,默认是1
  1. 示例

我们使用一个5*5的卷积核实现腐蚀和膨胀的运算:

import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt
# 1 读取图像
img = cv.imread("./image/image3.png")
# 2 创建核结构
kernel = np.ones((5, 5), np.uint8)

# 3 图像腐蚀和膨胀
erosion = cv.erode(img, kernel) # 腐蚀
dilate = cv.dilate(img,kernel) # 膨胀

# 4 图像展示
fig,axes=plt.subplots(nrows=1,ncols=3,figsize=(10,8),dpi=100)
axes[0].imshow(img)
axes[0].set_title("原图")
axes[1].imshow(erosion)
axes[1].set_title("腐蚀后结果")
axes[2].imshow(dilate)
axes[2].set_title("膨胀后结果")
plt.show()
阅读全文

python知识点

2025/3/30
项目01 python源码实例知识点

在控制台打出Icon表情:win+R → Wingdings → 字符码 0xcc 中间加个 f0 → 0xf0cc
大写字母转换为小写字母:str.upper()
小写字母转换为大写字母:str.lower()
station.reverse() #对高铁站列表反向输出
world = [“西北”, “中国”, “亚洲”, “世界”]
print(world[random.choice([0, 1, 2, 3])]
random.shuffle() 顺序打乱
random.sample(sqn,n) 从序列中选n个随机且不重复的元素
sdate=datetime.datetime.today().strftime(‘%m-%d %H:%M’)
sys.stdout.flush 刷新画面布
flag = random.choice([“+”, “-“]) # 随机产生”+”或 “-“号
ces = random.randint(1,100) # 随机生成一个数字
fl = f.readlines()
del fl[:8] #del删除切片(前8行数据)
fl = fl[1::3] #提取下标为1,步长为3的切片
str1 = ‘ ‘.join(fl) #join()函数分割文本数据
str1 = str1.replace(‘[QQ红包]请使用新版手机QQ查收红包。’,’’)
#滤除无用文本

文件操作对比
#打开文件
def button1():
  global file1
  file1=tk.filedialog.askopenfilename()
  txt_path1.set(file1)


#对比文件
def Diff():
  with open(file1) as f1,open(file2) as f2:
    text1 = f1.readlines()
    text2 = f2.readlines()
  d = difflib.HtmlDiff()
  with open('result1.html','w') as f:
    f.write(d.make_file(text1,text2))

a1=list(set(list1)) #字符串列表去重

print("".join([s for s in str1.splitlines(True) if s.strip()]))

这行代码的作用是打印字符串 str1 中非空行的内容,并且保留原有的换行符。下面是逐部分的解析:

str1.splitlines(True)
splitlines() 是字符串的方法,用于将字符串按行分割成列表。
参数 True 表示在分割时保留行尾的换行符。
这部分代码将 str1 按行分割成一个列表,每个元素是一行字符串,包括换行符。

[s for s in ... if s.strip()]
这是一个列表推导式,用于生成一个新的列表。
s for s in ... 表示对每个分割后的行字符串 s 进行迭代。

if s.strip() 是一个条件表达式,用于过滤掉空行或只包含空白字符的行。strip() 方法移除字符串两端的空白字符(包括空格、制表符、换行符等),如果 s.strip() 返回一个空字符串,表示 s 是一个空行或只包含空白字符。

"".join(...)
join() 是字符串的方法,用于将列表中的元素连接成一个字符串。这里使用空字符串 "" 作为连接符,表示不添加额外的字符。
这部分代码将列表推导式生成的列表中的所有非空行连接成一个单一的字符串,保留每行末尾的换行符。
print(...)
print() 函数用于输出结果到控制台。
这部分代码将最终连接好的字符串打印出来。
综合起来,这行代码的执行流程是:
----------------------------------------------------------------
将 str1 按行分割成列表,保留每行的换行符。
过滤掉列表中的空行或只包含空白字符的行。
将过滤后的非空行连接成一个字符串。
打印最终的结果。

#用线程控制自动切换单词
t = threading.Thread(target=autoChange)
t.start()

print("电影:",film)
print('判断演员是否本部电影的演员。回车确认“是”,输入任意键确认“不是”')
for i in range(6):                     # 6次竞猜
    new = random.choice(actall)        # 随机选择演员
    actall.remove(new)                 # 从演员库删除选择的演员,防止下次再次出现
    print(new)  # 显示竞猜演员
    # 从用户输入中获取一个字符串 并移除该字符串两端的空白字符
    num = input("").strip()  # 用户进行判断,选择回车还是其他键
    if not num:  # 选择回车,确认是该部电影主演
        if new not in acter:  # 如果该演员不在本部电影主演库里面
            count -= 3  # 积分减3分
            print("答错了,减三分!")
        else:  # 答对了
            count += 3  # 积分加3分
            print("答对了,加三分!")
        print("当前分数:", count)
    else:  # 选择其他键,确认不是该部电影主演
        if new not in acter:  # 如果该演员不在本部电影主演库里面
            count += 3  # 积分加3分
            print("答对了,加三分!")
        else:  # 答错了
            count -= 3  # 积分减3分
            print("答错了,减三分!")
        print("当前分数:", count)  # 输出当前积分

turtle(海龟)库是turtle绘图体系python的实现;
turtle绘图体系:1969年诞生,主要用于程序设计入门;
turtle库是python的标准库之一;属于入门级的图形绘制函数库;
turtle库绘制原理:有一只海龟在窗体正中心,在画布上游走,走过的轨迹形成了绘制的图形,海龟由程序控制,可以自由改变颜色、方向宽度等;

screen.onkeypress(None,”space”) #按空格键启动转盘

阅读全文

RAGFlow执行流程解析canvas

2025/3/19

Canvas.py

import logging
import json
from copy import deepcopy
from functools import partial

import pandas as pd

from agent.component import component_class
from agent.component.base import ComponentBase


class Canvas:
    """
    dsl = {
        "components": {
            "begin": {
                "obj":{
                    "component_name": "Begin",
                    "params": {},
                },
                "downstream": ["answer_0"],
                "upstream": [],
            },
            "answer_0": {
                "obj": {
                    "component_name": "Answer",
                    "params": {}
                },
                "downstream": ["retrieval_0"],
                "upstream": ["begin", "generate_0"],
            },
            "retrieval_0": {
                "obj": {
                    "component_name": "Retrieval",
                    "params": {}
                },
                "downstream": ["generate_0"],
                "upstream": ["answer_0"],
            },
            "generate_0": {
                "obj": {
                    "component_name": "Generate",
                    "params": {}
                },
                "downstream": ["answer_0"],
                "upstream": ["retrieval_0"],
            }
        },
        "history": [],
        "messages": [],
        "reference": [],
        "path": [["begin"]],
        "answer": []
    }
    """
    # dsl是组件流的JSON格式字符串 定义了组件的连接关系
    # tenant_id多租户管理
    # component存储所有组件实例
    def __init__(self, dsl: str, tenant_id=None):
        self.path = []
        self.history = []
        self.messages = []
        self.answer = []
        self.components = {}
        # 如果提供了dsl则解析JSON 否则使用默认的DSL结构(包含 Begin 组件)
        self.dsl = json.loads(dsl) if dsl else {
            "components": {
                "begin": {
                    "obj": {
                        "component_name": "Begin",
                        "params": {
                            "prologue": "Hi there!"
                        }
                    },
                    "downstream": [],
                    "upstream": [],
                    "parent_id": ""
                }
            },
            "history": [],
            "messages": [],
            "reference": [],
            "path": [],
            "answer": []
        }
        self._tenant_id = tenant_id
        self._embed_id = ""
        self.load()

    # 加载dsl
    def load(self):
      # 解析dsl["components"]存储组件实例 并检查必要组件是否存在
        # 组件字典存入self.components
        self.components = self.dsl["components"]
        # cpn_nms记录组件名称集合
        cpn_nms = set([])

        # 遍历组件 提取名称
        for k, cpn in self.components.items():
            cpn_nms.add(cpn["obj"]["component_name"])
        # 确保Begin和Answer组件存在 否则抛出错误
        assert "Begin" in cpn_nms, "There have to be an 'Begin' component."
        assert "Answer" in cpn_nms, "There have to be an 'Answer' component."

        # for循环便利self.components这个字典
        # 其中键k是组件标识符 cpn是一个包含组件详细信息的字典
        for k, cpn in self.components.items():
            # 将组件名称添加到cpn_nms集合中
            cpn_nms.add(cpn["obj"]["component_name"])
            # 通过component_class动态创建 拼接param
            param = component_class(cpn["obj"]["component_name"] + "Param")()
            # 更新param实例的属性 将cpn["obj"]["params"]字典中的键值对应用到param实例上
            param.update(cpn["obj"]["params"])
            param.check()
          # 使用component_class动态创建了一个新的组件实例 并将其赋值给cpn["obj"]
            # 新实例的创建使用了当前类实例self、组件标识符k和参数实例param
            cpn["obj"] = component_class(cpn["obj"]["component_name"])(self, k, param)
            # 检查当前组件的名称是否为"Categorize"
            if cpn["obj"].component_name == "Categorize":
                # 组件是"Categorize" 则遍历param.category_description字典 这个字典包含了分类描述
                for _, desc in param.category_description.items():
                    # 这两行代码检查desc["to"](可能是下游组件的标识符)是否已经在cpn["downstream"]列表中
                    if desc["to"] not in cpn["downstream"]:
                        # 如果不在,就将其添加到列表中
                        cpn["downstream"].append(desc["to"])
        # 将self.dsl["path"]的值赋给self.path属性
        self.path = self.dsl["path"]
        self.history = self.dsl["history"]
        self.messages = self.dsl["messages"]
        self.answer = self.dsl["answer"]
        self.reference = self.dsl["reference"]
        self._embed_id = self.dsl.get("embed_id", "")

    # 定义了类的字符串表示形式 通常用于打印对象时显示的信息
    # 这几行代码将类的属性值赋给self.dsl字典中的相应键
    def __str__(self):
        self.dsl["path"] = self.path
        self.dsl["history"] = self.history
        self.dsl["messages"] = self.messages
        self.dsl["answer"] = self.answer
        self.dsl["reference"] = self.reference
        self.dsl["embed_id"] = self._embed_id
        # 创建一个新的字典dsl 其中包含一个键components 其值为一个空字典
        dsl = {
            "components": {}
        }
        # 遍历self.dsl字典的键 如果键不是components 则将其值深拷贝到新的dsl字典中
        for k in self.dsl.keys():
            if k in ["components"]:
                continue
            dsl[k] = deepcopy(self.dsl[k])

        # 遍历self.components字典 将每个组件的信息深拷贝到dsl字典的components键下
        # 对于obj键 将其值转换为字符串后再解析为JSON
        for k, cpn in self.components.items():
            if k not in dsl["components"]:
                dsl["components"][k] = {}
            for c in cpn.keys():
                if c == "obj":
                    dsl["components"][k][c] = json.loads(str(cpn["obj"]))
                    continue
                dsl["components"][k][c] = deepcopy(cpn[c])
        # 返回dsl字典的JSON字符串表示形式 ensure_ascii=False确保非ASCII字符可以正确显示
        return json.dumps(dsl, ensure_ascii=False)

    # 将类的属性重置为空列表
    def reset(self):
        self.path = []
        self.history = []
        self.messages = []
        self.answer = []
        self.reference = []
        # 遍历self.components字典 并调用每个组件的reset方法
        for k, cpn in self.components.items():
            self.components[k]["obj"].reset()
        # 将_embed_id属性重置为空字符串
        self._embed_id = ""

    # 这个方法用于根据组件ID获取组件名称
    def get_component_name(self, cid):
        # 遍历self.dsl字典中的graph键下的nodes列表
        # 如果找到匹配的ID 则返回相应的组件名称
        for n in self.dsl["graph"]["nodes"]:
            if cid == n["id"]:
                return n["data"]["name"]
        # 如果没有找到匹配的ID 则返回空字符串
        return ""

    def run(self, **kwargs):
        # 如果self.answer列表不为空
        if self.answer:
            # 用于存储接下来要执行的组件ID
            cpn_id = self.answer[0]
            # 存完就从列表中移除
            self.answer.pop(0)
            try:
                # 尝试执行 self.components 字典中对应 cpn_id 的组件对象的 run 方法
                # 并传入 self.history 和 **kwargs 作为参数
                ans = self.components[cpn_id]["obj"].run(self.history, **kwargs)
                # 如果执行过程中发生异常
            except Exception as e:
                # 捕获该异常并使用 ComponentBase.be_output 方法处理异常信息
                ans = ComponentBase.be_output(str(e))
            # 将执行过的组件ID添加到 self.path 列表的最后一个元素中
            # self.path 似乎用于记录执行路径
            self.path[-1].append(cpn_id)
            # 如果 kwargs 中包含关键字 “stream”
            # 则假设 ans 是一个生成器 遍历它并逐个 yield 出来
            # 如果不是 则直接 yield ans
            if kwargs.get("stream"):
                for an in ans():
                    yield an
            else:
                yield ans
            return
        # 如果 self.path 为空 则执行名为 “begin” 的组件的 run 方法 并将 “begin” 添加到 self.path 中
        if not self.path:
            self.components["begin"]["obj"].run(self.history, **kwargs)
            self.path.append(["begin"])
        # 向 self.path 添加一个空列表 为接下来的组件执行做准备
        self.path.append([])

        ran = -1
        waiting = []
        without_dependent_checking = []

        # 当有下游组件downstream时
        # 它接受一个组件列表 cpns 作为参数
        def prepare2run(cpns):
            # 这些变量在函数外部定义 但在函数内部可以被修改
            nonlocal ran, ans
            # 遍历 cpns 列表中的每个组件ID c
            # 如果 c 已经是 self.path 中最后一个列表的最后一个元素 则跳过当前循环
            for c in cpns:
                if self.path[-1] and c == self.path[-1][-1]:
                    continue
                # 从 self.components 字典中获取组件ID c 对应的组件对象
                cpn = self.components[c]["obj"]
                # 如果组件的名称是 “Answer” 则将组件ID c 添加到 self.answer 列表中
                if cpn.component_name == "Answer":
                    self.answer.append(c)
                else:
                    # 如果组件不是 “Answer” 则记录调试信息 检查组件是否有未执行的依赖组件
                    # 如果有 将组件ID添加到 waiting 列表 并跳过当前循环
                    logging.debug(f"Canvas.prepare2run: {c}")
                    if c not in without_dependent_checking:
                        cpids = cpn.get_dependent_components()
                        if any([cc not in self.path[-1] for cc in cpids]):
                            if c not in waiting:
                                waiting.append(c)
                            continue
                    yield "*'{}'* is running...🕞".format(self.get_component_name(c))
                    # 如果组件是 “Iteration” 则获取其开始组件 并检查是否已经结束 如果没有结束 则更新 cpn 和 c 为开始组件
                    if cpn.component_name.lower() == "iteration":
                        st_cpn = cpn.get_start()
                        assert st_cpn, "Start component not found for Iteration."
                        if not st_cpn["obj"].end():
                            cpn = st_cpn["obj"]
                            c = cpn._id
                    # 尝试运行组件
                    try:
                        ans = cpn.run(self.history, **kwargs)
                    # 并捕获任何异常 如果发生异常 记录错误并更新 ran 然后重新抛出异常
                    except Exception as e:
                        logging.exception(f"Canvas.run got exception: {e}")
                        self.path[-1].append(c)
                        ran += 1
                        raise e
                    # 将组件ID c 添加到 self.path 的最后一个列表中
                    self.path[-1].append(c)
            # 增加 ran 计数器的值
            ran += 1
        """
        接下来的代码块处理下游组件的运行 循环检测 以及组件输出的合并
        这部分代码涉及到更复杂的逻辑,如处理循环、条件分支、组件输出等
        """
        # 获取上一个组件的下游组件列表
        downstream = self.components[self.path[-2][-1]]["downstream"]
        # 如果没有下游组件 但存在父组件ID 则合并父组件和当前组件的输出
        if not downstream and self.components[self.path[-2][-1]].get("parent_id"):
            cid = self.path[-2][-1]
            pid = self.components[cid]["parent_id"]
            o, _ = self.components[cid]["obj"].output(allow_partial=False)
            oo, _ = self.components[pid]["obj"].output(allow_partial=False)
            self.components[pid]["obj"].set_output(pd.concat([oo, o], ignore_index=True).dropna())
            downstream = [pid]
        # 递归调用 prepare2run 函数处理下游组件 并生成运行状态信息
        for m in prepare2run(downstream):
            yield {"content": m, "running_status": True}

        # 在 ran 的值有效时 处理循环和组件运行
        # ran 的值在 0 和 self.path[-1] 的长度之间时继续执行
        while 0 <= ran < len(self.path[-1]):
            logging.debug(f"Canvas.run: {ran} {self.path}")
            # 从 self.path 中获取当前要处理的组件ID cpn_id
            # 并通过 get_component 方法获取组件对象
            cpn_id = self.path[-1][ran]
            cpn = self.get_component(cpn_id)
            # 如果当前组件没有下游组件、没有父组件ID 并且没有等待处理的组件 则退出循环
            if not any([cpn["downstream"], cpn.get("parent_id"), waiting]):
                break

            # 调用 _find_loop 方法检查是否存在循环 如果存在则抛出 OverflowError 异常
            loop = self._find_loop()
            if loop:
                raise OverflowError(f"Too much loops: {loop}")
            # 如果组件的名称是 “switch”、“categorize” 或 “relevant” 则处理这些特定类型的组件
            if cpn["obj"].component_name.lower() in ["switch", "categorize", "relevant"]:
                switch_out = cpn["obj"].output()[1].iloc[0, 0]
                assert switch_out in self.components, \
                    "{}'s output: {} not valid.".format(cpn_id, switch_out)
                # 递归调用 prepare2run 函数处理特定组件的输出 并生成运行状态信息 然后继续下一次循环
                for m in prepare2run([switch_out]):
                    yield {"content": m, "running_status": True}
                continue

            # 处理父组件输出合并
            downstream = cpn["downstream"]
            # 如果没有下游组件但有父组件ID 则合并父组件和当前组件的输出
            if not downstream and cpn.get("parent_id"):
                pid = cpn["parent_id"]
                _, o = cpn["obj"].output(allow_partial=False)
                _, oo = self.components[pid]["obj"].output(allow_partial=False)
                self.components[pid]["obj"].set_output(pd.concat([oo.dropna(axis=1), o.dropna(axis=1)], ignore_index=True))
                downstream = [pid]
            # 递归调用 prepare2run 函数处理下游组件 并生成运行状态信息
            for m in prepare2run(downstream):
                yield {"content": m, "running_status": True}

            # 如果 ran 的值等于或超过 self.path[-1] 的长度
            # 并且有等待处理的组件 则处理这些组件 并更新 ran 的值
            if ran >= len(self.path[-1]) and waiting:
                without_dependent_checking = waiting
                waiting = []
                for m in prepare2run(without_dependent_checking):
                    yield {"content": m, "running_status": True}
                without_dependent_checking = []
                ran -= 1
        # 如果 self.answer 列表非空 处理答案组件
        if self.answer:
            cpn_id = self.answer[0]
            self.answer.pop(0)
            ans = self.components[cpn_id]["obj"].run(self.history, **kwargs)
            self.path[-1].append(cpn_id)
            # 如果 self.answer 列表为空 抛出异常 提示需要在流程末尾添加交互组件
            if kwargs.get("stream"):
                assert isinstance(ans, partial)
                for an in ans():
                    yield an
            else:
                yield ans

        else:
            raise Exception("The dialog flow has no way to interact with you. Please add an 'Interact' component to the end of the flow.")

    def get_component(self, cpn_id):
        return self.components[cpn_id]

    def get_tenant_id(self):
        return self._tenant_id
    # 定义一个名为 get_history 的方法 它接受一个参数 window_size 这个参数用于确定需要返回多少条历史记录
    def get_history(self, window_size):
        # 初始化一个空列表convs  用于存储格式化后的对话记录
        convs = []
        # 使用一个 for 循环遍历 self.history 的最后 window_size 条记录
        # self.history 应该是一个列表 其中每个元素都是一个包含角色和对象(对话内容)的元组
        """
            role 是对话中的角色,例如 “user” 或 “assistant”。
            obj 是与该角色相关的对话内容,它可能是一个列表或单个对象
        """
        for role, obj in self.history[window_size * -1:]:
            if isinstance(obj, list) and obj and all([isinstance(o, dict) for o in obj]):
                convs.append({"role": role, "content": '\n'.join([str(s.get("content", "")) for s in obj])})
            else:
                convs.append({"role": role, "content": str(obj)})
        return convs

    def add_user_input(self, question):
        self.history.append(("user", question))

    def set_embedding_model(self, embed_id):
        self._embed_id = embed_id

    def get_embedding_model(self):
        return self._embed_id
 # 定义一个名为 _find_loop 的方法 它接受一个可选参数 max_loops默认值为6 这个参数用于限制循环检测的次数
    def _find_loop(self, max_loops=6):
        # 获取 self.path 列表的最后一个元素(即当前路径)并将其反转 以便从后往前检查
        path = self.path[-1][::-1]
        # 如果路径的长度小于2 则没有足够的元素形成循环 因此直接返回 False
        if len(path) < 2:
            return False
        # 遍历路径,如果找到以 “answer” 或 “iterationitem” 开头的元素
        # 则从路径中移除该元素及其后面的所有元素
        # 因为这些元素可能表示循环的结束
        for i in range(len(path)):
            if path[i].lower().find("answer") == 0 or path[i].lower().find("iterationitem") == 0:
                path = path[:i]
                break
        # 再次检查路径长度 如果移除特定元素后长度小于2 则返回 False
        if len(path) < 2:
            return False
        # 使用一个循环来检查路径的不同子序列 loc 是子序列的长度
        # 从 2 开始 直到路径长度的一半
        for loc in range(2, len(path) // 2):
            # 创建一个子序列pat 它是路径的前loc个元素 用逗号连接
            pat = ",".join(path[0:loc])
            # 创建一个字符串 path_str 它是整个路径 用逗号连接
            path_str = ",".join(path)
            # 如果子序列pat的长度大于或等于整个路径字符串 ath_str的长度 则不可能形成循环 因此返回 False
            if len(pat) >= len(path_str):
                return False
            """
            使用一个 while 循环来检查 path_str 是否以 pat 开头 
            如果是 则减少 loop 计数 并从 path_str 中移除 pat 和随后的逗号
            如果 loop 计数降到 0 以下 说明找到了循环
            """
            loop = max_loops
            while path_str.find(pat) == 0 and loop >= 0:
                loop -= 1
                if len(pat)+1 >= len(path_str):
                    return False
                path_str = path_str[len(pat)+1:]
                # 如果找到了循环 则创建一个循环模式的字符串 使用 " => " 作为分隔符
                # 并将路径中的每个元素的前半部分(在冒号之前的部分)连接起来
                # 最后 返回一个表示循环的模式字符串
            if loop < 0:
                pat = " => ".join([p.split(":")[0] for p in path[0:loc]])
                return pat + " => " + pat

        return False

    def get_prologue(self):
        return self.components["begin"]["obj"]._param.prologue

    def set_global_param(self, **kwargs):
        for k, v in kwargs.items():
            for q in self.components["begin"]["obj"]._param.query:
                if k != q["key"]:
                    continue
                q["value"] = v

    def get_preset_param(self):
        return self.components["begin"]["obj"]._param.query

    def get_component_input_elements(self, cpnnm):
        return self.components[cpnnm]["obj"].get_input_elements()
阅读全文

RAGFlow代理组件技术

2025/3/19

如果要自己创建一个组件并且使用

要创建一个新的组件类 + 自定义新的组件参数

agent/component/xxxx.py 在里面继承 ComponentParamBase + ComponentBase 类 这样就可以重用其中的大部分方法和逻辑
② 新组件类需要实现 _run 方法,这个方法会定义组件的具体业务逻辑

BaiduFanyiParam →→ ComponentBase → base.py
★★★ base.py【里面的output是输出为元组(DataFrame) 
再通过序列化转换为(字典)(def as_dict(self))JSON】 ★★★

    扩展性:不同的子类可以根据具体需求实现自己的 _run 方法
           不同的业务逻辑可能需要不同的数据处理方式和计算方法
    灵活配置:kwargs 参数提供了方法调用时灵活传递参数的能力,可以根据需要在不同情况下调整行为。
    调试和监控:通过 logging.debug 和 set_output 记录输入和输出,可以在调试和生产环境中跟踪程序的行为,并及时发现问题。
    
    def run(self, history, **kwargs):
        logging.debug("{}, history: {}, kwargs: {}".format(self, json.dumps(history, ensure_ascii=False),
                                                              json.dumps(kwargs, ensure_ascii=False)))
        self._param.debug_inputs = []
        try:
            res = self._run(history, **kwargs)
            self.set_output(res)
        except Exception as e:
            self.set_output(pd.DataFrame([{"content": str(e)}]))
            raise e

        return res

    def _run(self, history, **kwargs):
        raise NotImplementedError()
【自定义组件】BaiduFanyi →→ ComponentBase → abc.py 【Python的抽象基类(类似于Java的接口)】
确保某些类在继承抽象基类时遵循特定的协议 这在设计大型系统或者框架时非常有用
★ 接口定义:定义了组件必须实现的方法 => `_run` => 组件执行其任务的地方
★ 组件命名:通过 component_name 类属性,它提供了组件的名称,这有助于在系统中识别和管理组件
★ 运行逻辑:`_run` 方法是组件的核心,它包含了组件的执行逻辑,例如如何使用参数、如何处理输入、如何调用外部API等
★ 输入/输出处理:output → base.py 用于处理组件的输出,确保所有组件的输出格式一致
------------------------------------------------------------------------------
【自定义组件参数】BaiduFanyiParam →→ ComponentParamBase → base.py 【初始化+校验】
定义了百度翻译组件所需的参数,例如 appid、secret_key、trans_type 等,它还实现了一个 check 方法,用于验证参数的有效性。
★ 参数封装:它封装了组件运行所需的所有参数,使得参数管理更加集中方便
★ 参数验证:通过 check 方法,可以确保在组件运行前所有必要的参数都被正确设置,并且是有效的
★ 可扩展性:其他组件的参数类可以继承 ComponentParamBase 并根据需要添加或修改参数
import random
# 抽象基类模块 用于定义组件的基类
from abc import ABC
# 用于发送 HTTP 请求,与 百度翻译 API 交互
import requests
# 基础组件类,于构建此组件的父类,定义组件的基本结构和参数。
from agent.component.base import ComponentBase, ComponentParamBase
# 用于生成百度翻译 API 请求签名(加密哈希)
from hashlib import md5

# 百度翻译组件的参数定义类 继承ComponentParamBase
class BaiduFanyiParam(ComponentParamBase):
    """
    Define the BaiduFanyi component parameters.
    """

    def __init__(self):
        super().__init__()
   # appid 和 secret_key 是百度翻译 API 的 身份验证 信息(需要在百度翻译开放平台获取)
        self.appid = "xxx"
        self.secret_key = "xxx"
   # translate:普通翻译   fieldtranslate:专业领域翻译
        self.trans_type = 'translate'
        self.parameters = []
   # source_lang 翻译的源语言   target_lang 翻译的目标语言
        self.source_lang = 'auto'
        self.target_lang = 'auto'
   # domain 如果使用专业领域翻译,需指定领域(如 finance 表示 金融 领域)
        self.domain = 'finance'
# 检查参数合法性
    def check(self):
    # check_empty():确保 appid 和 secret_key 不能为空。
        self.check_empty(self.appid, "BaiduFanyi APPID")
        self.check_empty(self.secret_key, "BaiduFanyi Secret Key")
# check_valid_value():确保 trans_type、source_lang、target_lang、domain 的值在合法选项之内
        self.check_valid_value(self.trans_type, "Translate type", ['translate', 'fieldtranslate'])
# 以下是列举各种语言
        self.check_valid_value(self.source_lang, "Source language",
                               ['auto', 'zh', 'en', 'yue', 'wyw', 'jp', 'kor', 'fra', 'spa', 'th', 'ara', 'ru', 'pt',
                                'de', 'it', 'el', 'nl', 'pl', 'bul', 'est', 'dan', 'fin', 'cs', 'rom', 'slo', 'swe',
                                'hu', 'cht', 'vie'])
        self.check_valid_value(self.target_lang, "Target language",
                               ['auto', 'zh', 'en', 'yue', 'wyw', 'jp', 'kor', 'fra', 'spa', 'th', 'ara', 'ru', 'pt',
                                'de', 'it', 'el', 'nl', 'pl', 'bul', 'est', 'dan', 'fin', 'cs', 'rom', 'slo', 'swe',
                                'hu', 'cht', 'vie'])
# 以下是列举不同专业领域
        self.check_valid_value(self.domain, "Translate field",
                               ['it', 'finance', 'machinery', 'senimed', 'novel', 'academic', 'aerospace', 'wiki',
                                'news', 'law', 'contract'])

# 继承ComponentBase和ABC


# -------------------------------------------------------------------------------- #


class BaiduFanyi(ComponentBase, ABC):
 # 定义组件名称在系统内唯一标识该组件
    component_name = "BaiduFanyi"
    # _run()是组件的 核心执行函数,用于处理翻译请求。
# **kwargs:可以让函数更加灵活,因为它可以接受任意数量的命名参数
# 在函数内部,kwargs 是一个字典,包含了所有传递给函数的额外命名参数。
    def _run(self, history, **kwargs):
'''
self.get_input() 获取输入内容。
如果 content 存在,则用 - 连接多个内容(拼接成单个字符串)。
如果 ans 为空,直接返回 ""
'''
        ans = self.get_input()
        ans = " - ".join(ans["content"]) if "content" in ans else ""
        if not ans:
            return BaiduFanyi.be_output("")

        try:
'''
source_lang 和 target_lang:来源和目标语言。
appid:百度翻译 API 的 应用 ID。
salt:随机数(百度 API 需要此参数)。
secret_key:百度 API 的 密钥
'''
            source_lang = self._param.source_lang
            target_lang = self._param.target_lang
            appid = self._param.appid
            salt = random.randint(32768, 65536)
            secret_key = self._param.secret_key
-------------------------- ☆ 普通翻译API请求 ☆ --------------------------
            if self._param.trans_type == 'translate':
            # md5签名,防止请求被篡改
                sign = md5((appid + ans + salt + secret_key).encode('utf-8')).hexdigest()
     # 发送 HTTP POST 请求 访问 百度翻译 API
                url = 'http://api.fanyi.baidu.com/api/trans/vip/translate?' + 'q=' + ans + '&from=' + source_lang + '&to=' + target_lang + '&appid=' + appid + '&salt=' + salt + '&sign=' + sign
                headers = {"Content-Type": "application/x-www-form-urlencoded"}
            # 解析返回结果
                response = requests.post(url=url, headers=headers).json()

                if response.get('error_code'):
                    BaiduFanyi.be_output("**Error**:" + response['error_msg'])

                return BaiduFanyi.be_output(response['trans_result'][0]['dst'])
-------------------------- ★ 专业翻译API请求 ★ --------------------------
            elif self._param.trans_type == 'fieldtranslate':
                domain = self._param.domain
                sign = md5((appid + ans + salt + domain + secret_key).encode('utf-8')).hexdigest()
                url = 'http://api.fanyi.baidu.com/api/trans/vip/fieldtranslate?' + 'q=' + ans + '&from=' + source_lang + '&to=' + target_lang + '&appid=' + appid + '&salt=' + salt + '&domain=' + domain + '&sign=' + sign
                headers = {"Content-Type": "application/x-www-form-urlencoded"}
                response = requests.post(url=url, headers=headers).json()

                if response.get('error_code'):
                    BaiduFanyi.be_output("**Error**:" + response['error_msg'])

                return BaiduFanyi.be_output(response['trans_result'][0]['dst'])
        # 捕获 所有异常,防止程序崩溃
        except Exception as e:
            BaiduFanyi.be_output("**Error**:" + str(e))
针对于自定义组件参数[ComponentParamBase] 为什么里面是用的Json格式?
  • 方便序列化json.dumps()将对象转换为JSON字符串,可以轻松地保存或传输对象数据。

    base.py
        def __str__(self):
            """
            {
                "component_name": "Begin",
                "params": {}
            }
            """
            # .format()将变量插入到字符串模板中的占位符位置  {{}}转义{}
            return """{{
                  "component_name": "{}",
                  "params": {},
                  "output": {},
                  "inputs": {}
              }}""".format(self.component_name,
                         self._param,
    # json.dumps(...): 将获取的 "output" 和 "inputs" 值序列化为 JSON 格式的字符串
    # json.loads(str(self._param)): 将 self._param 转换为字符串,然后将其解析为 JSON 对象
    # self._param 转换为字符串,然后将其解析为 JSON 对象
    # 从 JSON 对象中获取 "output" 键对应的值,如果不存在则返回空字典 {}
                         json.dumps(json.loads(str(self._param)).get("output", {}), ensure_ascii=False),
                         json.dumps(json.loads(str(self._param)).get("inputs", []), ensure_ascii=False)
            )
    
  • 方便调试和展示:在 __str__ 方法中返回 JSON 字符串,可以方便地查看对象的内容。尤其是在调试过程中,直接打印出对象的 JSON 格式可以帮助开发人员快速查看对象的状态及其内部数据结构

    # 将对象转换为 JSON 字符串
        def __str__(self):
            return json.dumps(self.as_dict(), ensure_ascii=False)
    # 递归地将对象的属性转换成字典 适用于将对象序列化为 JSON 或进行其他操作
        def as_dict(self):
            def _recursive_convert_obj_to_dict(obj):
                ret_dict = {}
                for attr_name in list(obj.__dict__):
                    if attr_name in [_FEEDED_DEPRECATED_PARAMS, _DEPRECATED_PARAMS, _USER_FEEDED_PARAMS, _IS_RAW_CONF]:
                        continue
                    # get attr
                    attr = getattr(obj, attr_name)
                    if isinstance(attr, pd.DataFrame):
                        ret_dict[attr_name] = attr.to_dict()
                        continue
                    if attr and type(attr).__name__ not in dir(builtins):
                        ret_dict[attr_name] = _recursive_convert_obj_to_dict(attr)
                    else:
                        ret_dict[attr_name] = attr
                return ret_dict
            return _recursive_convert_obj_to_dict(self)
    
  • 与大模型交互:当涉及到与大语言模型(如 GPT)或其他机器学习模型的交互时,JSON 格式的数据通常是标准的输入和输出格式。将对象转为 JSON 字符串,能够更容易地将数据传递给模型进行处理或分析,模型通常会接受 JSON 格式的数据进行训练或推理。

如何去运用?
  • 数据交换:可以将这个 JSON 字符串用作 API 请求或响应的数据格式。例如,你可以将这个对象作为 HTTP 请求的 body 发送,或者从网络中获取 JSON 格式的数据,然后解析回对象。
  • 持久化存储:如果你需要将对象数据持久化到数据库或文件系统,JSON 是一个很好的存储格式。例如,将对象数据存储在文件中或数据库表的 JSON 类型字段中,便于未来读取和操作。
  • 配置和参数更新update 方法中接收到的 conf 参数实际上是一个字典对象,它通过递归的方式更新对象的属性。如果你将对象序列化为 JSON 格式后,可以将 JSON 作为配置文件传递给应用,应用根据该配置动态调整其行为。这使得系统更加灵活和可配置。
  • 验证与校验validate 方法利用存储在 JSON 文件中的规则对对象进行参数验证,这是一种常见的方式来确保输入的数据符合特定的格式或限制。可以通过动态加载配置文件来验证对象的数据,保证数据的合法性。
数据输出(元组)及转换(JSON)
----------------------------------- 输出为元组 -----------------------------------    
    def output(self, allow_partial=True) -> Tuple[str, Union[pd.DataFrame, partial]]:
        o = getattr(self._param, self._param.output_var_name)
        if not isinstance(o, partial):
            if not isinstance(o, pd.DataFrame):
                if isinstance(o, list):
                    return self._param.output_var_name, pd.DataFrame(o)
                if o is None:
                    return self._param.output_var_name, pd.DataFrame()
                return self._param.output_var_name, pd.DataFrame([{"content": str(o)}])
            return self._param.output_var_name, o

        if allow_partial or not isinstance(o, partial):
            if not isinstance(o, partial) and not isinstance(o, pd.DataFrame):
                return pd.DataFrame(o if isinstance(o, list) else [o])
            return self._param.output_var_name, o

        outs = None
        for oo in o():
            if not isinstance(oo, pd.DataFrame):
                # 最终返回的是一个元组
                outs = pd.DataFrame(oo if isinstance(oo, list) else [oo])
            else:
                outs = oo
        return self._param.output_var_name, outs

----------------------------------- 序列化为JSON -----------------------------------
 # 将对象转换为 JSON 字符串
    def __str__(self):
        return json.dumps(self.as_dict(), ensure_ascii=False)

    # 递归地将对象的属性转换成字典 适用于将对象序列化为 JSON 或进行其他操作
    def as_dict(self):
        def _recursive_convert_obj_to_dict(obj):
            ret_dict = {}
            for attr_name in list(obj.__dict__):
                if attr_name in [_FEEDED_DEPRECATED_PARAMS, _DEPRECATED_PARAMS,
                                 _USER_FEEDED_PARAMS, _IS_RAW_CONF]:
                    continue
                # get attr
                attr = getattr(obj, attr_name)
                if isinstance(attr, pd.DataFrame):
                    ret_dict[attr_name] = attr.to_dict()
                    continue
                if attr and type(attr).__name__ not in dir(builtins):
                    ret_dict[attr_name] = _recursive_convert_obj_to_dict(attr)
                else:
                    ret_dict[attr_name] = attr

            return ret_dict

        return _recursive_convert_obj_to_dict(self)
    
--------------------------------------------------------------------------------------
def get_stream_input(self):
def get_input_elements(self):
    
def get_input(self):
# 上游组件的输出:它会检查当前组件的上游组件,并尝试从这些组件获取输出数据作为当前的输入
# 如果 _param.query 存在,它会遍历查询参数,并根据参数的类型(如组件 ID)来决定如何获取相应的输入数据
# input后再通过DataFrame转换为元组 再去转换为字典
# 来回来转换是为了满足特定的接口要求或者为了确保数据在不同层之间传输时的兼容性



'''
                           使用Pandas库来处理数据
★ get_input函数: 这个函数首先检查是否有调试输入,如果有,则返回一个包含调试输入内容的DataFrame。然后,它处理查询输入,根据查询中的不同条件(如component_id、value等),调用其他函数获取相应的输入内容,并将这些内容添加到self._param.inputs列表中。最后,它通过pd.concat函数将所有上游组件[结合history]的输出合并成一个DataFrame,并返回。

★ get_input_elements函数: 这个函数根据self._param.query中的查询,构建一个输入元素列表,其中每个元素包含名称、键和值。

★get_stream_input函数: 这个函数处理流式输入,根据组件路径,调用其他函数获取相应的输入内容,并返回。

主要是处理输入参数,并根据不同的条件获取相应的输入内容。它并没有直接接受不同种类类型的入参,而是根据传入的查询参数来获取输入内容。
'''

agent/component/baidufanyi.py 【后端代码】
agent/component/_ _ init _ _.py 【初始化】
③ web/src/pages/flow/flow-drawer/index.tsx【表单抽屉组件 UI展示】
④ web/src/pages/flow/form/baidu-fanyi-form/index.tsx 【提供用户界面让用户能配置并提交参数】
⑤ web/src/pages/flow/constant.tsx 【定义一个应用程序中使用的各种图标、常量、枚举、接口、状态和函数】
⑥ web/src/locales/zh.ts 【把所有要显示的搞到zh.ts中(前端组件描述)】
⑦ web/src/pages/agent/constant.tsx【引入组件所需的各种图标、以及操作项的样式、初始值、语言等】
⑧ web/src/pages/agent/form-sheet/use-form-config-map.tsx 【表单配置映射、配置不同操作对应表单组件】
⑨ web/src/pages/agent/form/baidu-fanyi-form/index.tsx 【构建配置表单 】
⑩ web/src/pages/agent/hooks.tsx 【管理图形界面流程图 各种钩子 → 建数据处理流程】





创建动态代理组件 [实战]

📌 实现步骤

  1. 后端(Python FastAPI)
    • 创建一个 API 服务器,提供组件注册、查询、运行接口。
    • 组件信息存储在内存(可扩展到数据库)。
    • 组件代码动态执行,支持 exec() 加载。
  2. 前端(HTML + JavaScript)
    • 提供一个表单,允许用户输入组件名称参数Python 代码
    • 提交表单后,通过 fetch() 发送 POST 请求给后端。
    • 组件创建成功后,前端可以调用 run 接口测试组件。
  3. Postman 测试
    • 先创建组件 POST /api/components/create
    • 再运行组件 POST /api/components/run

1️⃣ 后端代码(FastAPI)

创建 server.py,这个文件用于启动 FastAPI 服务器,并提供 API 端点。

安装依赖(如果未安装 FastAPI 和 Uvicorn):

pip install fastapi uvicorn pandas

📌 代码(server.py):

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import pandas as pd

app = FastAPI()

# 组件存储(模拟数据库)
components = {}

class ComponentData(BaseModel):
    name: str
    params: dict
    code: str

@app.post("/api/components/create")
def create_component(component: ComponentData):
    """
    创建新的动态组件,存储在内存中,并使用 exec() 加载组件代码
    """
    if component.name in components:
        raise HTTPException(status_code=400, detail="组件已存在")
    
    # 安全地执行代码,存储类定义
    local_vars = {}
    exec(component.code, globals(), local_vars)
    
    if component.name not in local_vars:
        raise HTTPException(status_code=400, detail="代码中必须定义同名类")
    
    components[component.name] = {
        "params": component.params,
        "class": local_vars[component.name]
    }
    return {"message": f"组件 {component.name} 创建成功"}

@app.get("/api/components")
def list_components():
    """
    获取所有注册的组件
    """
    return {"components": list(components.keys())}

class RunRequest(BaseModel):
    component_name: str
    history: list
    params: dict

@app.post("/api/components/run")
def run_component(request: RunRequest):
    """
    运行指定的组件,并返回其输出
    """
    if request.component_name not in components:
        raise HTTPException(status_code=404, detail="组件不存在")

    component_class = components[request.component_name]["class"]
    component_instance = component_class()
    
    # 调用 _run 方法,模拟运行
    try:
        result = component_instance._run(request.history, **request.params)
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"组件执行错误: {str(e)}")
    
    return {"result": result.to_dict(orient="records")}

# 运行服务器(可选:手动执行 uvicorn server:app --reload)
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

2️⃣ 前端代码

📌 HTML 页面

📌 代码(index.html):

<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>动态组件管理</title>
</head>
<body>
    <h2>创建动态组件</h2>
    <form id="componentForm">
        <label>组件名称:</label>
        <input type="text" id="componentName" required><br><br>

        <label>参数 (JSON 格式):</label>
        <textarea id="componentParams" required>{"param1": "value1"}</textarea><br><br>

        <label>Python 代码:</label>
        <textarea id="componentCode" required>
class TestComponent:
    def _run(self, history, **kwargs):
        return pd.DataFrame([{'content': 'Hello, this is a test!'}])
        </textarea><br><br>

        <button type="button" onclick="submitNewComponent()">提交组件</button>
    </form>

    <h2>运行组件</h2>
    <label>组件名称:</label>
    <input type="text" id="runComponentName" required><br><br>
    <button type="button" onclick="runComponent()">运行组件</button>

    <h3>运行结果:</h3>
    <pre id="result"></pre>

    <script src="test.js"></script>
</body>
</html>

📌 JavaScript 代码

📌 代码(test.js):

const apiBaseUrl = "http://localhost:8000/api";  // 替换为你的服务器地址

function submitNewComponent() {
    const name = document.getElementById("componentName").value;
    const params = document.getElementById("componentParams").value;
    const code = document.getElementById("componentCode").value;

    fetch(`${apiBaseUrl}/components/create`, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
            name: name,
            params: JSON.parse(params),
            code: code
        })
    })
    .then(response => response.json())
    .then(data => alert(data.message || JSON.stringify(data)))
    .catch(error => console.error("Error:", error));
}

function runComponent() {
    const componentName = document.getElementById("runComponentName").value;

    fetch(`${apiBaseUrl}/components/run`, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
            component_name: componentName,
            history: [],
            params: {}
        })
    })
    .then(response => response.json())
    .then(data => {
        document.getElementById("result").innerText = JSON.stringify(data, null, 2);
    })
    .catch(error => console.error("Error:", error));
}

3️⃣ 使用 Postman 测试

启动服务器后,可以使用 Postman 测试 API。

(1)创建动态组件

  • 方法POST

  • URLhttp://localhost:8000/api/components/create

  • Body(JSON)

    {
      "name": "TestComponent",
      "params": {
        "param1": "value1"
      },
      "code": "class TestComponent:\n    def _run(self, history, **kwargs):\n        return pd.DataFrame([{'content': 'Hello, world!'}])"
    }
    
  • 期望结果

    {
      "message": "组件 TestComponent 创建成功"
    }
    

(2)运行组件

  • 方法POST

  • URLhttp://localhost:8000/api/components/run

  • Body(JSON)

    {
      "component_name": "TestComponent",
      "history": [],
      "params": {}
    }
    
  • 期望结果

    {
      "result": [{"content": "Hello, world!"}]
    }
    

总结

  • 后端:FastAPI 实现了组件创建、查询、运行的 API。
  • 前端:HTML + JS 提供交互界面,用户可输入参数和代码。
  • 测试:可用 Postman 调试,确保组件能被动态创建和运行。

🚀 现在,你可以在 Postman 或前端界面中测试你的动态组件功能了!


纯后端情况如何添加自定义组件呢

这里以我琢磨的JSONPath自定义组件为例

首先要清楚RAGFlow的执行流程 要了解画布的流程 推荐先看一遍RAGFlow执行流程的文档 然后了解一下canvas.py和canvas_app.py的源码 掌握整体流程。

这里最重要的一个思想要明白 现在你只有后端 你没有前端 你在RAGFlow图形化界面拖拽进去的形式已经无法针对于纯后端添加自定义组件了 那怎么办?我们来梳理一下思路 我们想要什么?我们想要图形化界面拖拽过后的整体数据 也就是数据库里的dsl,我们要拿到它 然后用我们自己开发的自定义组件去解析这个大大的JSON。那么这里就涉及到没有前端 你如何去后端去添加自己做的自定义组件!当然常规方法 去图形化界面拖拽已经无法完成这种操作 你只能把希望寄托在数据库

我们来打开数据库 此时你需要一个组件的数据表 component

– rag_flow.component definition

CREATE TABLE component (

id varchar(32) NOT NULL,

create_time bigint DEFAULT NULL,

create_date datetime DEFAULT NULL,

update_time bigint DEFAULT NULL,

update_date datetime DEFAULT NULL,

tenant_id varchar(32) NOT NULL,

module varchar(255) NOT NULL,

created_by varchar(32) NOT NULL,

is_deleted tinyint DEFAULT ‘0’,

PRIMARY KEY (id)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

这是创建表的信息 目前我们只需要关注module里面的 我们要把我们自己开发的组件按照那些形式添加进去 我的组件叫:”jsonpath” 那么我的module里面就应该是agent.component.jsonpath 当然你要按照它的风格把组件放在这个包里面。

其次还要注意在这个包里面的__init__.py

把你的组件加进去 “jsonpath”:”JSONPath”

此时并非万事大吉 因为我们目前只有纯后端 没有前端那些 所以回到开头所说 你要自己把你的组件以json的格式手动加入到数据库的dsl中 因为很长所以你需要JSON在线解析格式化验证 - JSON.cn去让它展示的更直观

这是我组件需要的json 你要手动的去添加到dsl 步骤就是先复制数据库的dsl 然后放在解析器里面 手动的去添加自己的组件

“JSONPath:PathProcessor”: {“obj”: {“component_name”: “JSONPath”,”params”: {“output_var_name”: “output”,”jsonpath_expr”: “$.components[*].obj.component_name”,”debug_mode”: true,”query”: [],”inputs”: [],”debug_inputs”: [],”output”: {“content”: {“0”: “”}}}},”downstream”: [“Generate:TwelveClocksSpeak”],”upstream”: []}

这只是我的组件json 你需要在完整的dsl把它插入进去 按照它们形成的格式 这里说明一下 dsl就是你在图形化界面创建agent的时候里面各各组件的相关信息 当然你还要注意自己写的组件类型 是用来干什么的 你才能去做到对应相应的处理 比如说我这个是解析json的 那我就不能去拿上下文组件的输入或输出 正常的流程是连线后你要拿到上游的输出结果 作为你组件的输入结果。我这个则是拿到全局画布canvas_data数据再去进行解析

所以要明白需求才能去更好的融入代码中 经过这几天的学习真的感觉 只有自己一点点的摸索过后才能真正的印象深刻 才能慢慢变得强大!

阅读全文

RAGFlow工具技术文档

2025/3/7

RAGFlow代理组件的核心

  • 集成外部数据源:通过检索模块来继承外部知识库、数据库、文档等内容,增强模型的回答能力
  • 生成式任务处理:利用模型生成的能力,基于用户的查询返回自然语言的回答
  • 模块化:RAGFlow支持灵活的模块化设计,可以通过配置和集成不同的组件来满足特定的需求

RAGFlow代理组件

RAGFlow 的代理组件是其架构中的核心部分,负责管理与外部数据源生成模型的交互。代理组件不仅可以从不同的外部数据源中获取知识,还能将检索到的信息与生成模型结合,从而生成更加准确的回答。

代理组件的功能

  • 数据检索:代理组件会从多个外部数据源(如数据库、文档、API等)进行数据检索,获取与用户查询相关的信息。
  • 生成模型的调用:将检索到的信息与生成模型结合,通过生成模型提供一个流畅且准确的答案。
  • 多源数据融合:在生成回答时,代理组件能够处理多来源的数据,并将其融合成最终的答案。

组件 → web/src/utils *待定

组件前端交互中心web/src/utils/api.ts
它定义了一些与“画布”相关的 API 接口地址,用于与后端进行通信。通过这些接口,前端应用可以进行不同操作,如获取模板、操作画布、调试等

listTemplates: 用于获取所有画布模板的列表,发送请求到 /canvas/templates
listCanvas: 获取画布列表的接口,发送请求到 /canvas/list

Begin组件

Begin 组件设置开场问候语或接受用户的输入。当您创建代理时,它会自动填充到画布上,无论是从模板还是从头开始(从空白模板)。工作流程中应该只有一个 Begin 组件。

api/ragflow/web/src/locales/zh.ts
……
setAnOpenerInitial: 你好! 我是你的助理,有什么可以帮到你的吗?,

静态消息: /v1/canvas/set
生成问答、问题优化、问题类型、关键词/v1/llm/list
知识检索/v1/llm/list + /v1/kb/list

以**问题优化 **(RewriteQuestion:MoodyCrabsPeel) 为例:


后端内容

知识检索Retrieval 路径→agent/component/retrieval.py

# 记录日志
import logging
# 抽象基类模块
from abc import ABC
# 用于数据处理,尤其是处理数据框(DataFrame)
import pandas as pd
# 从数据库相关模块导入 LLMType,这个可能是用来定义 LLM(语言模型)类型的枚举
from api.db import LLMType
# 从数据库服务模块中导入,分别用于知识库服务和 LLM 配置的封装
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.services.llm_service import LLMBundle
# 加载应用程序的设置
from api import settings
# 基础组件类,用于构建此组件的父类
from agent.component.base import ComponentBase, ComponentParamBase
# 从标签模块中导入,用于对查询进行标记
from rag.app.tag import label_question


class RetrievalParam(ComponentParamBase):

    """
    Define the Retrieval component parameters.
    """
    def __init__(self):
        super().__init__()
        # 相似度阈值
        self.similarity_threshold = 0.2
        # 关键字相似度的权重
        self.keywords_similarity_weight = 0.5
        # 检索时返回的结果数
        self.top_n = 8
        # 在检索时考虑的最大数量
        self.top_k = 1024
        # 知识库的 ID 列表
        self.kb_ids = []
        # 重新排序模型的 ID (可能为空)
        self.rerank_id = ""
        # 当没有找到任何结果时的响应文本
        self.empty_response = ""
#### check_decimal_float???
   # check():对参数进行验证,确保相似度阈值和权重为有效的小数,且 top_n 是正数。
    def check(self):
        # "相似性阈值"
        self.check_decimal_float(self.similarity_threshold, "[Retrieval] Similarity threshold")
        # "关键词相似度权重"
        self.check_decimal_float(self.keywords_similarity_weight, "[Retrieval] Keyword similarity weight")
        self.check_positive_number(self.top_n, "[Retrieval] Top N")

# Retrieval 类继承自 ComponentBase 和 ABC(抽象基类)
# 表示这是一个组件,并实现了抽象方法
class Retrieval(ComponentBase, ABC):
    component_name = "Retrieval"

##### history, **kwargs???
    # _run为核心方法 
# query输入查询内容 包括'content'键 提取第一个内容项 将查询内容转换为字符串
    def _run(self, history, **kwargs):
        query = self.get_input()
        query = str(query["content"][0]) if "content" in query else ""
    # 从知识库服务中获取指定 ID 的知识库。如果没有找到相关的知识库,返回空的响应。
        kbs = KnowledgebaseService.get_by_ids(self._param.kb_ids)
        if not kbs:
            return Retrieval.be_output("")
# 获取所有知识库的嵌入模型 ID(embd_id),确保所有知识库使用相同的嵌入模型。
# 如果使用的嵌入模型不一致,抛出异常
        embd_nms = list(set([kb.embd_id for kb in kbs]))
        assert len(embd_nms) == 1, "Knowledge bases use different embedding models."

##### LLMBundle???
# 创建 LLMBundle 实例来封装嵌入模型,使用画布的租户 ID 和嵌入模型的 ID 配置嵌入模型,并设置到画布
        embd_mdl = LLMBundle(self._canvas.get_tenant_id(), LLMType.EMBEDDING, embd_nms[0])
        self._canvas.set_embedding_model(embd_nms[0])

   # 若提供了 rerank_id,则加载重新排序的模型
        rerank_mdl = None
        if self._param.rerank_id:
            rerank_mdl = LLMBundle(kbs[0].tenant_id, LLMType.RERANK, self._param.rerank_id)

  # 使用 settings.retrievaler.retrieval 方法执行知识检索操作,传入查询和各种配置参数
# query:用户查询   embd_mdl:嵌入模型    其他参数包括知识库 ID、相似度阈值、关键词相似度权重等
        kbinfos = settings.retrievaler.retrieval(query, embd_mdl, kbs[0].tenant_id, self._param.kb_ids,
                                        1, self._param.top_n,
                                        self._param.similarity_threshold, 1 - self._param.keywords_similarity_weight,
                                        aggs=False, rerank_mdl=rerank_mdl,
                                        rank_feature=label_question(query, kbs))
# 如果检索结果为空(没有找到相关内容),则返回空响应
# 如果设置了 empty_response,则在响应中返回该内容。
        if not kbinfos["chunks"]:
            df = Retrieval.be_output("")
            if self._param.empty_response and self._param.empty_response.strip():
                df["empty_response"] = self._param.empty_response
            return df
'''
如果有检索结果,将结果转换为 Pandas 的 DataFrame 格式。
将 content_with_weight 重命名为 content,然后删除 content_with_weight 列。
输出调试日志,记录查询和检索结果。
'''
        df = pd.DataFrame(kbinfos["chunks"])
        df["content"] = df["content_with_weight"]
        del df["content_with_weight"]
        logging.debug("{} {}".format(query, df))
        return df

如果你需要一个 定制的代理组件,我可以基于这个 Retrieval 组件的架构,为你设计一个类似的组件,满足你的特定需求。

你可以告诉我:

  1. 代理组件的具体用途(比如:请求转发、智能路由、流量控制、负载均衡、身份验证等)。
  2. 需要支持的参数(比如:代理地址、超时时间、认证方式等)。
  3. 核心功能(比如:日志记录、请求缓存、故障恢复等)。
  4. 集成的其他服务(比如:Redis、RabbitMQ、数据库等)。

如果你希望它遵循 ComponentBase 组件框架,我可以按照该模式来实现,让它可以与现有系统无缝对接!

下面的是引用的重要方法上一个代码块里的

retrieval.pybase.py

**kwargs 允许调用 _run() 方法时 传入任意数量的额外参数,即使这些参数没有在方法签名中显式列出#上方代码索引:#### check_decimal_float???
# check():对参数进行验证,确保【相似度阈值】和【权重】为有效的小数,且 top_n 是正数。
    def check(self):
        # "相似性阈值"
        self.check_decimal_float(self.similarity_threshold, "[Retrieval] Similarity threshold")
        # "关键词相似度权重"
        self.check_decimal_float(self.keywords_similarity_weight, "[Retrieval] Keyword similarity weight")
        self.check_positive_number(self.top_n, "[Retrieval] Top N")
---------------------------  ☆ 上方代码块片段 ☆  ---------------------------

# 校验检索相关参数的合法性
# 这是一个 静态方法,用于校验参数 param 是否是 0 到 1 之间的浮点数或整数

'''
校验逻辑:
param 必须是 float 或 int 类型(不允许 str、list 等)。
param 需要在 [0, 1] 之间,否则抛出 ValueError 异常,并在错误信息中包含 descr 描述。
'''
# 这里为什么是0-1 不能超过1? 如果 similarity_threshold = 1.2,会触发 check_decimal_float 的异常抛出,提示超出 [0,1] 范围。而且在前端的拖拉中也只会是0~1之间。因为这些参数代表 归一化(normalized)后的比例或权重,它们的值通常不能超过 1
# 【0表示示完全相同】
    @staticmethod
    def check_decimal_float(param, descr):
        if type(param).__name__ not in ["float", "int"] or param < 0 or param > 1:
            raise ValueError(
                descr
                + " {} not supported, should be a float number in range [0, 1]".format(
                    param
                )
            )
'''
这里为什么要用 @staticmethod?
1. 避免创建对象,提高效率 👉 由于静态方法不依赖实体属性或方法,因此可以直接通过类调用,而不必创建对象
2. 代码组织清晰 👉 适用于逻辑独立的方法,可以让代码更容易读、更结构化
3. 防止修改实例状态 👉 由于静态方法不能访问self,因此不会修改实例的状态,保证了方法的纯函数特征
-------- -------- -------- 结合代码解析 -------- -------- -------- 
4. 这个方法 不访问实例变量,它只 检查参数 是否符合 0~1 之间的范围
5. 适用于 工具方法,属于 数据验证逻辑,不依赖 self
6. 可以直接通过类调用:
Retrieval.check_decimal_float(0.8, "Threshold")  # 正常
Retrieval.check_decimal_float(1.2, "Threshold")  # 抛出 ValueError
'''
==========================================================================
#上方代码索引:##### history, **kwargs???
    def _run(self, history, **kwargs):
        query = self.get_input()
        query = str(query["content"][0]) if "content" in query else ""
        ......
# 讲解重点:【**kwargs】
# [**kwargs] 允许调用 _run() 方法时 传入任意数量的额外参数,即使这些参数没有在方法签名中显式列出 比如:👇
    _run(self, history, user_id=123, debug=True, mode="fast")
# 在 _run() 方法内部,这些参数可以通过 kwargs["user_id"]、kwargs["debug"] 访问,但 当前代码里没有使用 kwargs,所以它只是用来兼容可能的扩展。
'''
目前 **kwargs 没有被使用,它的作用是:

扩展性:可以传递额外的参数而不修改函数签名
兼容性:允许不同调用方传递不同的参数,而 _run() 仍能正常运行
适用场景:未来需要额外参数
目前 **kwargs 只是 占位符,但它提供了未来扩展的可能性
'''
特征 普通方法(self) 静态方法(@staticmethod) 类方法(@classmethod)
依赖实例(self) × ×
依赖类(cls) × ×
访问实例变量 × ×
访问类变量 ×
适用于 需要访问实例 工具方法、验证方法 需要修改类变量

retrieval.py




让我们再来看一个案例

百度翻译baidufanyi.py 路径 → agent/component/baidufanyi.py

# 
import random
# 抽象基类模块 用于定义组件的基类
from abc import ABC
# 用于发送 HTTP 请求,与 百度翻译 API 交互
import requests
# 基础组件类,于构建此组件的父类,定义组件的基本结构和参数。
from agent.component.base import ComponentBase, ComponentParamBase
# 用于生成百度翻译 API 请求签名(加密哈希)
from hashlib import md5

# 百度翻译组件的参数定义类 继承ComponentParamBase
class BaiduFanyiParam(ComponentParamBase):
    """
    Define the BaiduFanyi component parameters.
    """

    def __init__(self):
        super().__init__()
   # appid 和 secret_key 是百度翻译 API 的 身份验证 信息(需要在百度翻译开放平台获取)
        self.appid = "xxx"
        self.secret_key = "xxx"
   # translate:普通翻译   fieldtranslate:专业领域翻译
        self.trans_type = 'translate'
        self.parameters = []
   # source_lang 翻译的源语言   target_lang 翻译的目标语言
        self.source_lang = 'auto'
        self.target_lang = 'auto'
   # domain 如果使用专业领域翻译,需指定领域(如 finance 表示 金融 领域)
        self.domain = 'finance'
# 检查参数合法性
    def check(self):
    # check_empty():确保 appid 和 secret_key 不能为空。
        self.check_empty(self.appid, "BaiduFanyi APPID")
        self.check_empty(self.secret_key, "BaiduFanyi Secret Key")
# check_valid_value():确保 trans_type、source_lang、target_lang、domain 的值在合法选项之内
        self.check_valid_value(self.trans_type, "Translate type", ['translate', 'fieldtranslate'])
# 以下是列举各种语言
        self.check_valid_value(self.source_lang, "Source language",
                               ['auto', 'zh', 'en', 'yue', 'wyw', 'jp', 'kor', 'fra', 'spa', 'th', 'ara', 'ru', 'pt',
                                'de', 'it', 'el', 'nl', 'pl', 'bul', 'est', 'dan', 'fin', 'cs', 'rom', 'slo', 'swe',
                                'hu', 'cht', 'vie'])
        self.check_valid_value(self.target_lang, "Target language",
                               ['auto', 'zh', 'en', 'yue', 'wyw', 'jp', 'kor', 'fra', 'spa', 'th', 'ara', 'ru', 'pt',
                                'de', 'it', 'el', 'nl', 'pl', 'bul', 'est', 'dan', 'fin', 'cs', 'rom', 'slo', 'swe',
                                'hu', 'cht', 'vie'])
# 以下是列举不同专业领域
        self.check_valid_value(self.domain, "Translate field",
                               ['it', 'finance', 'machinery', 'senimed', 'novel', 'academic', 'aerospace', 'wiki',
                                'news', 'law', 'contract'])

# 继承ComponentBase和ABC
class BaiduFanyi(ComponentBase, ABC):
 # 定义组件名称在系统内唯一标识该组件
    component_name = "BaiduFanyi"
    # _run()是组件的 核心执行函数,用于处理翻译请求。
# **kwargs:可以让函数更加灵活,因为它可以接受任意数量的命名参数
# 在函数内部,kwargs 是一个字典,包含了所有传递给函数的额外命名参数。
    def _run(self, history, **kwargs):
'''
self.get_input() 获取输入内容。
如果 content 存在,则用 - 连接多个内容(拼接成单个字符串)。
如果 ans 为空,直接返回 ""
'''
        ans = self.get_input()
        ans = " - ".join(ans["content"]) if "content" in ans else ""
        if not ans:
            return BaiduFanyi.be_output("")

        try:
'''
source_lang 和 target_lang:来源和目标语言。
appid:百度翻译 API 的 应用 ID。
salt:随机数(百度 API 需要此参数)。
secret_key:百度 API 的 密钥
'''
            source_lang = self._param.source_lang
            target_lang = self._param.target_lang
            appid = self._param.appid
            salt = random.randint(32768, 65536)
            secret_key = self._param.secret_key
-------------------------- ☆ 普通翻译API请求 ☆ --------------------------
            if self._param.trans_type == 'translate':
            # md5签名,防止请求被篡改
                sign = md5((appid + ans + salt + secret_key).encode('utf-8')).hexdigest()
     # 发送 HTTP POST 请求 访问 百度翻译 API
                url = 'http://api.fanyi.baidu.com/api/trans/vip/translate?' + 'q=' + ans + '&from=' + source_lang + '&to=' + target_lang + '&appid=' + appid + '&salt=' + salt + '&sign=' + sign
                headers = {"Content-Type": "application/x-www-form-urlencoded"}
            # 解析返回结果
                response = requests.post(url=url, headers=headers).json()

                if response.get('error_code'):
                    BaiduFanyi.be_output("**Error**:" + response['error_msg'])

                return BaiduFanyi.be_output(response['trans_result'][0]['dst'])
-------------------------- ★ 专业翻译API请求 ★ --------------------------
            elif self._param.trans_type == 'fieldtranslate':
                domain = self._param.domain
                sign = md5((appid + ans + salt + domain + secret_key).encode('utf-8')).hexdigest()
                url = 'http://api.fanyi.baidu.com/api/trans/vip/fieldtranslate?' + 'q=' + ans + '&from=' + source_lang + '&to=' + target_lang + '&appid=' + appid + '&salt=' + salt + '&domain=' + domain + '&sign=' + sign
                headers = {"Content-Type": "application/x-www-form-urlencoded"}
                response = requests.post(url=url, headers=headers).json()

                if response.get('error_code'):
                    BaiduFanyi.be_output("**Error**:" + response['error_msg'])

                return BaiduFanyi.be_output(response['trans_result'][0]['dst'])
        # 捕获 所有异常,防止程序崩溃
        except Exception as e:
            BaiduFanyi.be_output("**Error**:" + str(e))

深层次研究

from agent.component.base import ComponentBase, ComponentParamBase

主要用于处理、更新、验证和警告组件参数,适用于那些需要动态配置、递归嵌套结构且要求高可扩展性的系统

class ComponentParamBase(ABC):
    def __init__(self):
        # 初始化输出变量名
        self.output_var_name = "output"
        # 初始化消息历史窗口大小
        self.message_history_window_size = 22
        # 初始化查询参数
        self.query = []
        # 初始化输入参数
        self.inputs = []
        # 初始化调试输入参数
        self.debug_inputs = []
    # 设置组件的名称,并返回当前对象,支持链式调用
    def set_name(self, name: str):
        self._name = name
        return self

    # 抽象方法 子类必须实现 它的目的是检查参数是否有效
    def check(self):
        raise NotImplementedError("Parameter Object should be checked.")

    # 类方法 用于检查类是否具有某个属性 _DEPRECATED_PARAMS 如果没有 则初始化一个空的集合
    @classmethod
    def _get_or_init_deprecated_params_set(cls):
        if not hasattr(cls, _DEPRECATED_PARAMS):
            setattr(cls, _DEPRECATED_PARAMS, set())
        return getattr(cls, _DEPRECATED_PARAMS)
    # 用于检查和初始化实例的已废弃参数集合
    def _get_or_init_feeded_deprecated_params_set(self, conf=None):
        if not hasattr(self, _FEEDED_DEPRECATED_PARAMS):
            if conf is None:
                setattr(self, _FEEDED_DEPRECATED_PARAMS, set())
            else:
                setattr(
                    self,
                    _FEEDED_DEPRECATED_PARAMS,
                    set(conf[_FEEDED_DEPRECATED_PARAMS]),
                )
        return getattr(self, _FEEDED_DEPRECATED_PARAMS)
    # 用于检查和初始化实例的已废弃参数集合
    def _get_or_init_user_feeded_params_set(self, conf=None):
        if not hasattr(self, _USER_FEEDED_PARAMS):
            if conf is None:
                setattr(self, _USER_FEEDED_PARAMS, set())
            else:
                setattr(self, _USER_FEEDED_PARAMS, set(conf[_USER_FEEDED_PARAMS]))
        return getattr(self, _USER_FEEDED_PARAMS)
    # 返回用户提供的参数
    def get_user_feeded(self):
        return self._get_or_init_user_feeded_params_set()
    # 返回已废弃的参数集合
    def get_feeded_deprecated_params(self):
        return self._get_or_init_feeded_deprecated_params_set()
    
    # @装饰器 定义了 _deprecated_params_set 属性 它返回已废弃的参数集合
    @property
    def _deprecated_params_set(self):
        return {name: True for name in self.get_feeded_deprecated_params()}
    
    # 将对象转换为 JSON 字符串
    def __str__(self):
        return json.dumps(self.as_dict(), ensure_ascii=False)

    # 递归地将对象的属性转换成字典 适用于将对象序列化为 JSON 或进行其他操作
    def as_dict(self):
        def _recursive_convert_obj_to_dict(obj):
            ret_dict = {}
            for attr_name in list(obj.__dict__):
                if attr_name in [_FEEDED_DEPRECATED_PARAMS, _DEPRECATED_PARAMS, _USER_FEEDED_PARAMS, _IS_RAW_CONF]:
                    continue
                # get attr
                attr = getattr(obj, attr_name)
                if isinstance(attr, pd.DataFrame):
                    ret_dict[attr_name] = attr.to_dict()
                    continue
                if attr and type(attr).__name__ not in dir(builtins):
                    ret_dict[attr_name] = _recursive_convert_obj_to_dict(attr)
                else:
                    ret_dict[attr_name] = attr

            return ret_dict

        return _recursive_convert_obj_to_dict(self)

    # 用于更新对象的参数,并根据传入的配置进行修改 它处理递归更新参数、检查冗余属性等
    def update(self, conf, allow_redundant=False):
        update_from_raw_conf = conf.get(_IS_RAW_CONF, True)
        if update_from_raw_conf:
            deprecated_params_set = self._get_or_init_deprecated_params_set()
            feeded_deprecated_params_set = (
                self._get_or_init_feeded_deprecated_params_set()
            )
            user_feeded_params_set = self._get_or_init_user_feeded_params_set()
            setattr(self, _IS_RAW_CONF, False)
        else:
            feeded_deprecated_params_set = (
                self._get_or_init_feeded_deprecated_params_set(conf)
            )
            user_feeded_params_set = self._get_or_init_user_feeded_params_set(conf)

        def _recursive_update_param(param, config, depth, prefix):
            if depth > settings.PARAM_MAXDEPTH:
                raise ValueError("Param define nesting too deep!!!, can not parse it")

            inst_variables = param.__dict__
            redundant_attrs = []
            for config_key, config_value in config.items():
                # redundant attr
                if config_key not in inst_variables:
                    if not update_from_raw_conf and config_key.startswith("_"):
                        setattr(param, config_key, config_value)
                    else:
                        setattr(param, config_key, config_value)
                        # redundant_attrs.append(config_key)
                    continue

                full_config_key = f"{prefix}{config_key}"

                if update_from_raw_conf:
                    # add user feeded params
                    user_feeded_params_set.add(full_config_key)

                    # update user feeded deprecated param set
                    if full_config_key in deprecated_params_set:
                        feeded_deprecated_params_set.add(full_config_key)

                # supported attr
                attr = getattr(param, config_key)
                if type(attr).__name__ in dir(builtins) or attr is None:
                    setattr(param, config_key, config_value)

                else:
                    # recursive set obj attr
                    sub_params = _recursive_update_param(
                        attr, config_value, depth + 1, prefix=f"{prefix}{config_key}."
                    )
                    setattr(param, config_key, sub_params)

            if not allow_redundant and redundant_attrs:
                raise ValueError(
                    f"cpn `{getattr(self, '_name', type(self))}` has redundant parameters: `{[redundant_attrs]}`"
                )

            return param

        return _recursive_update_param(param=self, config=conf, depth=0, prefix="")

    # 用于提取所有非内置类型的属性 递归地遍历对象的属性
    def extract_not_builtin(self):
        def _get_not_builtin_types(obj):
            ret_dict = {}
            for variable in obj.__dict__:
                attr = getattr(obj, variable)
                if attr and type(attr).__name__ not in dir(builtins):
                    ret_dict[variable] = _get_not_builtin_types(attr)

            return ret_dict

        return _get_not_builtin_types(self)
    
    # validate 方法用于验证对象的参数是否符合预定义的规则 规则存储在 JSON 文件中
    def validate(self):
        self.builtin_types = dir(builtins)
        self.func = {
            "ge": self._greater_equal_than,
            "le": self._less_equal_than,
            "in": self._in,
            "not_in": self._not_in,
            "range": self._range,
        }
        home_dir = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
        param_validation_path_prefix = home_dir + "/param_validation/"

        param_name = type(self).__name__
        param_validation_path = "/".join(
            [param_validation_path_prefix, param_name + ".json"]
        )

        validation_json = None

        try:
            with open(param_validation_path, "r") as fin:
                validation_json = json.loads(fin.read())
        except BaseException:
            return

        self._validate_param(self, validation_json)

    def _validate_param(self, param_obj, validation_json):
        default_section = type(param_obj).__name__
        var_list = param_obj.__dict__

        for variable in var_list:
            attr = getattr(param_obj, variable)

            if type(attr).__name__ in self.builtin_types or attr is None:
                if variable not in validation_json:
                    continue

                validation_dict = validation_json[default_section][variable]
                value = getattr(param_obj, variable)
                value_legal = False

                for op_type in validation_dict:
                    if self.func[op_type](value, validation_dict[op_type]):
                        value_legal = True
                        break

                if not value_legal:
                    raise ValueError(
                        "Plase check runtime conf, {} = {} does not match user-parameter restriction".format(
                            variable, value
                        )
                    )

            elif variable in validation_json:
                self._validate_param(attr, validation_json)

    # 用于验证参数的类型和范围 确保符合预期          
    @staticmethod
    def check_string(param, descr):
        if type(param).__name__ not in ["str"]:
            raise ValueError(
                descr + " {} not supported, should be string type".format(param)
            )

    @staticmethod
    def check_empty(param, descr):
        if not param:
            raise ValueError(
                descr + " does not support empty value."
            )

    @staticmethod
    def check_positive_integer(param, descr):
        if type(param).__name__ not in ["int", "long"] or param <= 0:
            raise ValueError(
                descr + " {} not supported, should be positive integer".format(param)
            )

    @staticmethod
    def check_positive_number(param, descr):
        if type(param).__name__ not in ["float", "int", "long"] or param <= 0:
            raise ValueError(
                descr + " {} not supported, should be positive numeric".format(param)
            )

    @staticmethod
    def check_nonnegative_number(param, descr):
        if type(param).__name__ not in ["float", "int", "long"] or param < 0:
            raise ValueError(
                descr
                + " {} not supported, should be non-negative numeric".format(param)
            )

    @staticmethod
    def check_decimal_float(param, descr):
        if type(param).__name__ not in ["float", "int"] or param < 0 or param > 1:
            raise ValueError(
                descr
                + " {} not supported, should be a float number in range [0, 1]".format(
                    param
                )
            )

    @staticmethod
    def check_boolean(param, descr):
        if type(param).__name__ != "bool":
            raise ValueError(
                descr + " {} not supported, should be bool type".format(param)
            )

    @staticmethod
    def check_open_unit_interval(param, descr):
        if type(param).__name__ not in ["float"] or param <= 0 or param >= 1:
            raise ValueError(
                descr + " should be a numeric number between 0 and 1 exclusively"
            )

    @staticmethod
    def check_valid_value(param, descr, valid_values):
        if param not in valid_values:
            raise ValueError(
                descr
                + " {} is not supported, it should be in {}".format(param, valid_values)
            )

    @staticmethod
    def check_defined_type(param, descr, types):
        if type(param).__name__ not in types:
            raise ValueError(
                descr + " {} not supported, should be one of {}".format(param, types)
            )

    @staticmethod
    def check_and_change_lower(param, valid_list, descr=""):
        if type(param).__name__ != "str":
            raise ValueError(
                descr
                + " {} not supported, should be one of {}".format(param, valid_list)
            )

        lower_param = param.lower()
        if lower_param in valid_list:
            return lower_param
        else:
            raise ValueError(
                descr
                + " {} not supported, should be one of {}".format(param, valid_list)
            )

    @staticmethod
    def _greater_equal_than(value, limit):
        return value >= limit - settings.FLOAT_ZERO

    @staticmethod
    def _less_equal_than(value, limit):
        return value <= limit + settings.FLOAT_ZERO

    @staticmethod
    def _range(value, ranges):
        in_range = False
        for left_limit, right_limit in ranges:
            if (
                    left_limit - settings.FLOAT_ZERO
                    <= value
                    <= right_limit + settings.FLOAT_ZERO
            ):
                in_range = True
                break

        return in_range

    @staticmethod
    def _in(value, right_value_list):
        return value in right_value_list

    @staticmethod
    def _not_in(value, wrong_value_list):
        return value not in wrong_value_list

    def _warn_deprecated_param(self, param_name, descr):
        if self._deprecated_params_set.get(param_name):
            logging.warning(
                f"{descr} {param_name} is deprecated and ignored in this version."
            )

    def _warn_to_deprecate_param(self, param_name, descr, new_param):
        if self._deprecated_params_set.get(param_name):
            logging.warning(
                f"{descr} {param_name} will be deprecated in future release; "
                f"please use {new_param} instead."
            )
            return True
        return False
这里运用到json 为什么要用json?
  • 方便序列化json.dumps()将对象转换为JSON字符串,可以轻松地保存或传输对象数据。
  • 方便调试和展示:在 __str__ 方法中返回 JSON 字符串,可以方便地查看对象的内容。尤其是在调试过程中,直接打印出对象的 JSON 格式可以帮助开发人员快速查看对象的状态及其内部数据结构
  • 与大模型交互:当涉及到与大语言模型(如 GPT)或其他机器学习模型的交互时,JSON 格式的数据通常是标准的输入和输出格式。将对象转为 JSON 字符串,能够更容易地将数据传递给模型进行处理或分析,模型通常会接受 JSON 格式的数据进行训练或推理。
如何去运用?
  • 数据交换:可以将这个 JSON 字符串用作 API 请求或响应的数据格式。例如,你可以将这个对象作为 HTTP 请求的 body 发送,或者从网络中获取 JSON 格式的数据,然后解析回对象。
  • 持久化存储:如果你需要将对象数据持久化到数据库或文件系统,JSON 是一个很好的存储格式。例如,将对象数据存储在文件中或数据库表的 JSON 类型字段中,便于未来读取和操作。
  • 配置和参数更新update 方法中接收到的 conf 参数实际上是一个字典对象,它通过递归的方式更新对象的属性。如果你将对象序列化为 JSON 格式后,可以将 JSON 作为配置文件传递给应用,应用根据该配置动态调整其行为。这使得系统更加灵活和可配置。
  • 验证与校验validate 方法利用存储在 JSON 文件中的规则对对象进行参数验证,这是一种常见的方式来确保输入的数据符合特定的格式或限制。可以通过动态加载配置文件来验证对象的数据,保证数据的合法性。

怎么和大模型交互 json是以什么规范 是否让机器更好的理解?

如果自己开发了一个代理组件后如何去融入呢?

index.tsx

路径web/src/pages/flow/form/baidu-fanyi-form/index.tsx

BaiduFanyiForm 是一个用于配置百度翻译(Baidu Fanyi)接口参数的表单组件。该组件基于 Ant Design (antd) 的 Form 组件,允许用户输入翻译所需的 AppID、密钥、翻译类型、领域、源语言和目标语言

// useTranslate('flow'):自定义 Hook,提供国际化翻译能力
import { useTranslate } from '@/hooks/common-hooks';
// Form, Input, Select:Ant Design 表单组件
import { Form, Input, Select } from 'antd';
// useMemo:优化计算、提高性能,避免不必要的重新计算
import { useMemo } from 'react';
// 从 constant 中引入翻译领域和语言选项
import {
  BaiduFanyiDomainOptions,
  BaiduFanyiSourceLangOptions,
} from '../../constant';
// 表单组件的类型定义,包含 onValuesChange, form, node 等参数
import { IOperatorForm } from '../../interface';
// 动态输入组件,可能用于变量替换
import DynamicInputVariable from '../components/dynamic-input-variable';

/*
生成 trans_type(翻译类型)的 Select 选项:
translate:普通翻译
fieldtranslate:专业领域翻译
选项的 label 通过 t() 进行国际化翻译。
*/
const BaiduFanyiForm = ({ onValuesChange, form, node }: IOperatorForm) => {
  const { t } = useTranslate('flow');
  const options = useMemo(() => {
    return ['translate', 'fieldtranslate'].map((x) => ({
      value: x,
      label: t(`baiduSecretKeyOptions.${x}`),
    }));
  }, [t]);
// 生成 领域翻译 选项(如医学、法律等)
  const baiduFanyiOptions = useMemo(() => {
    return BaiduFanyiDomainOptions.map((x) => ({
      value: x,
      label: t(`baiduDomainOptions.${x}`),
    }));
  }, [t]);
// 源语言(source_lang)和 目标语言(target_lang)
  const baiduFanyiSourceLangOptions = useMemo(() => {
    return BaiduFanyiSourceLangOptions.map((x) => ({
      value: x,
      label: t(`baiduSourceLangOptions.${x}`),
    }));
  }, [t]);

  return (
    <Form
      name="basic" //表单名称
      autoComplete="off" //关闭自动填充
      form={form} //绑定 Ant Design 表单实例,允许动态设置表单值
      onValuesChange={onValuesChange} //表单值变更时触发外部回调
      layout={'vertical'}
    >
          <!-- 允许用户输入 AppID 和 密钥,用于身份验证 -->
      <DynamicInputVariable node={node}></DynamicInputVariable>
      <Form.Item label={t('appid')} name={'appid'}>
        <Input></Input>
      </Form.Item>
      <Form.Item label={t('secretKey')} name={'secret_key'}>
        <Input></Input>
      </Form.Item>
          <!-- 翻译类型 -->
      <Form.Item label={t('transType')} name={'trans_type'}>
        <Select options={options}></Select>
      </Form.Item>
          <!-- 领域翻译 仅在 fieldtranslate 选中时显示 -->
  <!-- dependencies={['model_type']}:监听 model_type 字段的变化 -->
      <Form.Item noStyle dependencies={['model_type']}>
        {({ getFieldValue }) =>
          getFieldValue('trans_type') === 'fieldtranslate' && (
            <Form.Item label={t('domain')} name={'domain'}>
              <Select options={baiduFanyiOptions}></Select>
            </Form.Item>
          )
        }
      </Form.Item>
      <Form.Item label={t('sourceLang')} name={'source_lang'}>
        <Select options={baiduFanyiSourceLangOptions}></Select>
      </Form.Item>
      <Form.Item label={t('targetLang')} name={'target_lang'}>
        <Select options={baiduFanyiSourceLangOptions}></Select>
      </Form.Item>
    </Form>
  );
};
export default BaiduFanyiForm;

BaiduFanyiForm 组件是百度翻译 API 参数的前端配置表单,支持动态交互(如 trans_type 选择 fieldtranslate 时动态显示 domain 选项)。该组件用于某个流程(flow)系统,允许用户输入翻译 API 相关信息,并提供 国际化支持useTranslate()



_ _ init _ _.py

首先在 agent/component/__init__.py 在组件同级目录下有一个__init__.py
它的主要作用是**导入并管理各种组件,这些组件用于数据处理、信息检索、翻译、SQL执行、财经数据获取等**不同功能。其中的重要代码:

# import importlib:用于动态导入模块(在 component_class 方法中使用)
# 这些组件分布在 agent.component 模块下,提供不同的功能

'''
该函数用于动态加载组件,通过 importlib.import_module("agent.component") 导入 agent.component 模块,然后使用 getattr(m, class_name) 获取类,从而返回对应的组件类。
'''
def component_class(class_name):
    m = importlib.import_module("agent.component")
    c = getattr(m, class_name)
    return c

'''
__all__ 列表:指定模块对外暴露的所有组件
使其成为 from module import * 时可以被访问的对象。
'''
__all__ = [
    "GitHub",
    "GitHubParam",
    "BaiduFanyi",
    "BaiduFanyiParam",
]

web/src/pages/flow/flow-drawer/index.tsx

web/src/pages/flow/flow-drawer/index.tsx里有这样一行代码
import BaiduFanyiForm from '../form/baidu-fanyi-form';
该文件 index.tsx 主要用于**表单抽屉组件 (FormDrawer)**,用于在 UI 中展示不同操作节点的表单,并支持编辑、调试等功能。

代码功能概览

🔹 动态渲染不同的表单组件BaiduFanyiForm 等)
🔹 支持不同类型的操作节点(Operator)(如 BaiduFanyi, Google, Generate 等)
🔹 提供表单输入交互,并支持单步调试
🔹 使用 Drawer 组件作为侧边栏弹出窗口

★ 组件导入

import { Drawer, Flex, Form, Input } from 'antd';
import { get, isPlainObject, lowerFirst } from 'lodash';
import { Play } from 'lucide-react';
import { CloseOutlined } from '@ant-design/icons';
...

📌 作用:引入 Ant Design 组件(DrawerFormInput)、lodash 工具库以及 Play(调试按钮)、CloseOutlined(关闭按钮)。

★ 引入表单组件

# 当然不能忘了我们的百度翻译
import BaiduFanyiForm from '../form/baidu-fanyi-form';
import GoogleForm from '../form/google-form';
import GenerateForm from '../form/generate-form';
import RetrievalForm from '../form/retrieval-form';
...

📌 作用:引入 BaiduFanyiForm 组件,该组件用于百度翻译的表单交互。

✅ 这些表单组件用于不同的 Operator 操作,例如:

  • BaiduFanyiForm百度翻译表单
  • GoogleFormGoogle 搜索表单
  • GenerateForm文本生成表单
  • RetrievalForm信息检索表单

★ FormMap(操作类型与表6单组件的映射)

const FormMap = {
  [Operator.BaiduFanyi]: BaiduFanyiForm,
  [Operator.Google]: GoogleForm,
  [Operator.Generate]: GenerateForm,
  [Operator.Retrieval]: RetrievalForm,
};

📌 作用
动态匹配 Operator(操作类型)与 对应表单组件
OperatorBaiduFanyi 时,渲染 BaiduFanyiForm
可以扩展不同的表单类型

☆ FormDrawer组件

const FormDrawer = ({ visible, hideModal, node, singleDebugDrawerVisible, hideSingleDebugDrawer, showSingleDebugDrawer }: IModalProps<any> & IProps) => {

📌 作用FormDrawer 是一个 表单抽屉组件,用于展示不同类型的表单。

☆ 动态渲染表单

const OperatorForm = FormMap[operatorName] ?? EmptyContent;

📌 作用: ✅ 根据 Operator(操作类型)选择对应的表单组件
✅ 如果 Operator 不在 FormMap 中,则显示 EmptyContent(空组件)

☆ 处理表单数据

useEffect(() => {
  if (visible) {
    form.resetFields();
    form.setFieldsValue(node?.data?.form);
  }
}, [visible, form, node?.data?.form]);

📌 作用: ✅ 当抽屉可见时,重置表单并填充数据
如果 OperatorCategorize,则特殊处理分类数据

☆ 侧边栏 (Drawer)

<Drawer
  title={
    <Flex>
      <OperatorIcon name={operatorName} />
      <Input value={name} onChange={handleNameChange} />
      <Play onClick={showSingleDebugDrawer} />
      <CloseOutlined onClick={hideModal} />
    </Flex>
  }
  open={visible}
>
  <OperatorForm onValuesChange={handleValuesChange} form={form} node={node} />
</Drawer>

📌 作用: ✅ 标题区域包含操作图标、输入框、调试按钮、关闭按钮
使用 OperatorForm 组件渲染表单
点击 Play 按钮打开调试模式


★ ★ 总结一下BaiduFanyiForm相关逻辑 ★ ★
index.tsx 里,BaiduFanyiForm 被注册到 FormMap,然后通过 FormDrawer 组件

🚀 如何渲染 BaiduFanyiForm

  1. Operator.BaiduFanyi 触发 BaiduFanyiForm
  2. FormDrawer 组件动态匹配 FormMap[Operator.BaiduFanyi]
  3. BaiduFanyiForm 渲染在 Drawer

📌 结论: ✅ BaiduFanyiForm 只是众多表单组件之一,专门用于百度翻译的功能
index.tsx 主要是表单的动态加载器,统一管理所有 Operator 类型
FormDrawer 负责 UI 渲染、数据填充、调试交互等


agent/component/baidufanyi.py 【后端代码】
agent/component/_ _ init _ _.py 【初始化】
③ web/src/pages/flow/flow-drawer/index.tsx【表单抽屉组件 UI展示】
④ web/src/pages/flow/form/baidu-fanyi-form/index.tsx 【提供用户界面让用户能配置并提交参数】
⑤ web/src/pages/flow/constant.tsx 【定义一个应用程序中使用的各种图标、常量、枚举、接口、状态和函数】
⑥ web/src/locales/zh.ts 【把所有要显示的搞到zh.ts中(前端组件描述)】
⑦ web/src/pages/agent/constant.tsx【引入组件所需的各种图标、以及操作项的样式、初始值、语言等】
⑧ web/src/pages/agent/form-sheet/use-form-config-map.tsx 【表单配置映射、配置不同操作对应表单组件】
⑨ web/src/pages/agent/form/baidu-fanyi-form/index.tsx 【构建配置表单 】
⑩ web/src/pages/agent/hooks.tsx 【管理图形界面流程图 各种钩子 → 建数据处理流程】


阅读全文

RAGFlow

2025/3/5

什么是RAGFlow

RAGFlow 是一种基于生成模型的技术,常用于智能聊天系统、推荐系统和自然语言处理任务。如果你是从零基础开始学习 RAGFlow(可能指的是 Retrieval-Augmented Generation Flow 或与其相关的模型/框架),以下是一些推荐的学习路径:

1. 理解基础概念

在学习 RAGFlow 之前,你需要了解一些相关的基本概念:

  • 自然语言处理(NLP):学习 NLP 的基本概念,比如分词、词向量(Word Embedding)、序列建模等。
  • 机器学习基础:了解机器学习的基本原理,包括监督学习、无监督学习、深度学习等。
  • 深度学习框架:熟悉一些常用的深度学习框架,如 TensorFlow 和 PyTorch,它们会在训练和实现 RAGFlow 模型时用到。

ragflow/README_zh.md at main · infiniflow/ragflow

先装个win版的Docker Desktop
打开 Anaconda Prompt 先放着不管

克隆 RAGFlow 的仓库 $ git clone https://github.com/infiniflow/ragflow.git

——————————— ★ ★ ———————————

Anaconda Prompt的命令窗口里
输入 cd E:\Python\ragflow\docker
接着 E:
然后就开始 装载 查看状态

关闭 docker-compose down
装载 docker compose up -d
查看状态 docker logs -f ragflow-server
——————————— ★ ★ ———————————

在你的 docker-compose.yml 配置中,Redis 的连接问题可能是因为 RAGFlow 配置文件中的 Redis 部分使用了密码,并且 Redis 运行在 localhost 上。如果 Redis 没有设置密码且端口号是 localhost,你可以尝试以下步骤进行调整:

  1. 修改 RAGFlow 的配置:在 ragflow 服务的配置中,确保 Redis 的连接信息正确。需要确保 Redis 连接时不使用密码并且连接到 localhost

  2. 更新 service_conf.yaml 配置:根据日志,RAGFlow 连接 Redis 时使用了密码。你需要检查 service_conf.yaml 中的 Redis 配置信息,将其修改为不使用密码,并指定 localhost

    示例修改:

      host: localhost
      port: 6379
      db: 1
      password: ""
    
  3. 检查 docker-compose.yml 配置:确保 ragflow 服务的 Redis 部分没有设置密码,且连接端口是 localhost。根据你的配置,默认情况下应该是没有密码的。

  4. 重新启动服务:在修改配置后,可以重新启动 Docker 服务以使修改生效。

    执行命令:

    docker-compose down
    docker-compose up -d
    

这样,你应该能够解决 Redis 连接的相关问题。如果还有其他问题,请继续提供日志或详细信息。

如果改了配置还是不行 ↓

检查与解决步骤

1. 确保 service_conf.yaml 已正确修改

你可以进入 RAGFlow 容器,查看 RAGFlow 读取的配置是否正确:

docker exec -it ragflow-server cat /ragflow/conf/service_conf.yaml

检查 redis 配置项是否是:

  redis:
  host: ragflow-redis  
  port: 6379
  db: 1
  password: ""  

如果不是,说明 docker-compose 可能仍然使用了旧的配置,你需要手动更新。

1. 确保 service_conf.yaml 绑定到了容器

检查 docker-compose.yml 是否正确挂载 service_conf.yaml
docker-compose.yml 里应该有:

yaml复制编辑volumes:
  - ./service_conf.yaml:/ragflow/conf/service_conf.yaml

如果没有这行,说明 service_conf.yaml 没有正确挂载,导致 RAGFlow 仍然使用容器内部的默认配置。

如果你修改了 docker-compose.yml,请执行:

docker-compose down
docker-compose up -d

然后 重新检查 容器内的 service_conf.yaml 是否正确:

docker exec -it ragflow-server cat /ragflow/conf/service_conf.yaml

你需要在请求中提供正确的凭证。根据之前你提供的账号密码 elastic:infini_rag_flow,你可以尝试以下命令:

docker exec -it 3a50f571ce26 curl -u elastic:infini_rag_flow http://localhost:9200

关于RAGFlow的顶级逆天Bug解决办法>

这个不是BUG,是使用问题,如果你是如下和我一样的报错,可以参考我的解决步骤
是使用问题,我这边完美解决了这个问题,解决步骤如下:
问题: docker logs -f 917c48df8473(infiniflow/ragflow:v0.16.0的容器日志)
peewee.OperationalError: (1045, “Access denied for user ‘root‘@’172.24.0.6’ (using password: YES)”)

问题原因,这个镜像的mysql密码一致是默认密码,只要配置没改全就会报错

解决办法:
  1. 停止并移除现有容器
    cd ./docker
    docker compose -f docker-compose.yml down
  2. 删除或备份数据卷
    备份命令:
    cp -r /home/data/docker/volumes/docker_esdata01 /backup/docker_esdata01_backup
    cp -r /home/data/docker/volumes/docker_minio_data /backup/docker_minio_data_backup
    cp -r /home/data/docker/volumes/docker_mysql_data /backup/docker_mysql_data_backup
    cp -r /home/data/docker/volumes/docker_redis_data /backup/docker_redis_data_backup
    删除命令:
    docker volume rm docker_esdata01
    docker volume rm docker_minio_data
    docker volume rm docker_mysql_data
    docker volume rm docker_redis_data
  3. 删除未使用的网络配置
    docker network prune
  4. 修改docker/.env 、 docker/service_conf.yaml.template和conf/service_conf.yaml 这三个文件
    中所有的密码,设置成自己的,务必这三处都保持一致(重要)
  5. 重新启动服务
    cd ./docker
    [root@localhost docker]# docker compose -f docker-compose.yml up -d
    WARN[0000] The “HF_ENDPOINT” variable is not set. Defaulting to a blank string.
    WARN[0000] The “MACOS” variable is not set. Defaulting to a blank string.
    [+] Running 10/10
    ✔ Network docker_ragflow Created 0.0s
    ✔ Volume “docker_esdata01” Created 0.0s
    ✔ Volume “docker_mysql_data” Created 0.0s
    ✔ Volume “docker_minio_data” Created 0.0s
    ✔ Volume “docker_redis_data” Created 0.0s
    ✔ Container ragflow-minio Started 0.4s
    ✔ Container ragflow-es-01 Started 0.4s
    ✔ Container ragflow-redis Started 0.4s
    ✔ Container ragflow-mysql Healthy 10.9s
    ✔ Container ragflow-server Started 11.1s

6.查看日志看看,没有如下报错了:

2025-02-26 10:51:53,554 INFO 20 TaskExecutor: RAGFlow version: v0.16.0 full
2025-02-26 10:51:53,554 INFO 20 Use Elasticsearch http://es01:9200 as the doc engine.
2025-02-26 10:51:53,563 INFO 20 GET http://es01:9200/ [status:200 duration:0.007s]
2025-02-26 10:51:53,566 INFO 20 HEAD http://es01:9200/ [status:200 duration:0.003s]
2025-02-26 10:51:53,566 INFO 20 Elasticsearch http://es01:9200 is healthy.
2025-02-26 10:51:53,572 WARNING 20 Load term.freq FAIL!
2025-02-26 10:51:53,577 WARNING 20 Realtime synonym is disabled, since no redis connection.
2025-02-26 10:51:53,583 WARNING 20 Load term.freq FAIL!
2025-02-26 10:51:53,588 WARNING 20 Realtime synonym is disabled, since no redis connection.
2025-02-26 10:51:53,588 INFO 20 MAX_CONTENT_LENGTH: 134217728
2025-02-26 10:51:53,588 INFO 20 SERVER_QUEUE_MAX_LEN: 1024
2025-02-26 10:51:53,588 INFO 20 SERVER_QUEUE_RETENTION: 3600
2025-02-26 10:51:53,588 INFO 20 MAX_FILE_COUNT_PER_USER: 0
2025-02-26 10:51:53,591 WARNING 20 RedisDB.queue_info rag_flow_svr_queue got exception: no such key
2025-02-26 10:51:53,592 INFO 20 task_consumer_0 reported heartbeat: {“name”: “task_consumer_0”, “now”: “2025-02-26T10:51:53.591+08:00”, “boot_at”: “2025-02-26T10:51:53.554+08:00”, “pending”: 0, “lag”: 0, “done”: 0, “failed”: 0, “current”: null}

7.注册登录注册测试:OK

👍2

shaw/dmeta-embedding-zh
Anaconda Prompt启动: ollama pull shaw/dmeta-embedding-zh

把每个段落做成向量化的编码

阅读全文

AutoGen

2025/2/27

待完成:架构 设计思想 整个代码怎么组织起来的 怎么去满足各种需求
B站牛人UP主【DataSense】
AutoGen 技术博客系列 (一):基础介绍与入门教程这篇博客提供了对 AutoGen 的基础介绍和入门教程,包括核心 - 掘金
AutoGen 智能应用开发(一)|AutoGen 基础_哔哩哔哩_bilibili

[microsoft/autogen: A programming framework for agentic AI 🤖 PyPi: autogen-agentchat Discord: https://aka.ms/autogen-discord Office Hour: https://aka.ms/autogen-officehour] (https://github.com/microsoft/autogen)

概述

AutoGenAutoGen Studio 都是由微软开发的用于创建和管理人工智能(AI)智能体的工具,但它们在功能和目标方面存在一些差异。

AutoGen 是一款基于 AI 和大数据技术的产品设计工具,旨在通过自动提取产品需求、智能优化设计方案并生成最终的产品模型或解决方案。其核心目标是将人工干预降至最低,从而提升用户体验和系统开发效率。AutoGen 是一个开源的 Python 框架,用于定义、配置和组合 AI 代理以构建多智能体应用。它提供了一种用于描述代理行为和它们之间交互的声明性语言。AutoGen 非常灵活,可用于构建各种多智能体应用,但它需要一定的编程技能才能使用。

AutoGen Studio 是一个基于 AutoGen 框架的图形用户界面(GUI)工具。它使开发人员能够更轻松地创建和管理多智能体应用,而无需编写代码。AutoGen Studio 提供了拖放式界面和各种预构建模块,可以简化多智能体应用的开发过程。但是,AutoGen Studio 的灵活性不如 AutoGen,并且它可能不适用于需要高度定制的应用。

以下表格总结了 AutoGen 和 AutoGen Studio 的一些关键区别:

特性 AutoGen AutoGen Studio
类型 框架 GUI 工具
抽象级别 更底层 更高层
灵活度 更灵活 不太灵活
易用性 更难使用 更易使用
编程要求 需要编程技能 无需编程技能
常见用例 高度定制的多智能体应用 通用多智能体应用

总而言之,AutoGen 适合需要高度定制和灵活性的多智能体应用开发人员,而 AutoGen Studio 适合需要快速构建通用多智能体应用的开发人员。

AI + 行业 = Agent

只需要把接口复制粘贴来到AutoGen Studio的技能表
**万能API**:[天聚数行TianAPI - 应用开发者API数据调用平台] (https://www.tianapi.com/)

我们要去给智能体配置上你需要的能力!

我结合智能体我可以做到每个行业 但每个行业都有自己角色的一个定义
我要给智能体定义角色


  1. 技术基础和领域:
    • “AutoGen” 可能依赖于特定的技术,如机器学习算法、自然语言处理模型等,
      主要用于自动化代码生成、文档自动提取或数据分析任务。
    • “AiAgent” 则是人工智能领域的核心概念,涵盖从需求理解到解决方案生成的
      全生命周期管理。它能够将不同领域的需求整合起来,提供综合性的解决方案。
  2. 应用场景:
    • 在“AutoGen”的应用中,可能包括自动化代码生成、文档自动化处理、数据分析
      工具的构建等。
    • 在“AiAgent”的应用中,可能涉及需求分析和理解,数据预处理与清洗,以及生
      成生成模型(如“AutoGen”)中的产品设计生成。
  3. 功能特点:
    • “AutoGen” 可能专注于自动化、快速迭代和标准化能力,适合需要大量重复劳
      动的任务。
    • “AiAgent” 通常结合人工智能的高级算法和分布式系统,能够处理复杂、多领
      域的问题,并提供灵活的解决方案。
  4. 研究方向与应用深度:
    • 在“AutoGen”的研究中,可能侧重于特定任务的具体实现和技术优化。
    • 在“AiAgent”的研究中,可能涉及跨领域的知识融合、分布式计算环境等高级技
      术,推动了AI领域的发展。

全自动化的 AI Agents

LangChain可以实现做AI Agent
XAgent || AutoGen 是更一步进化的

如果有与图片输出相关的,帮忙加一个GenImg(“图片生成提示 Prompt“)
利用Prompt设定可以分配任务到不同的Agent上

AutoGen 是一个框架,支持使用多个代理来开发 LLM 应用程序这些代理可以相互对话来解决任务。AutoGen 代理是可定制的、可对话的,并且无缝地允许人参与其中。他们可以结合 LLM、人工输入和不同工具的各种模式运行。

AutoGen是微软的开源框架

GitHub:microsoft/autogen: A programming framework for agentic AI 🤖 PyPi: autogen-agentchat Discord: https://aka.ms/autogen-discord Office Hour: https://aka.ms/autogen-officehour

文档博客:AutoGen — AutoGen

AutoGen 主要特点

AutoGen 可以轻松构建基于多代理对话的下一代 LLM 应用程序。它简化了复杂的LLM 工作流程的编排、自动化和优化。它最大限度地提高了 LLM 模型的性能并克服了它们的弱点。
它支持复杂工作流程的多种对话模式。借助可定制和可对话的代理,开发人员可以使用 AutoGen 构建各种涉及对话自主性、代理数量和代理对话拓扑的对话模式。
它提供了一系列具有不同复杂性的工作系统。 这些系统涵盖各种领域和复杂性的广泛应用。 这演示了 AutoGen 如何轻松支持不同的对话模式。
AutoGen 提供增强的 LLM 推理。 它提供 API统一和缓存等实用程序,以及错误处理、多配置推理、上下文编程等高级使用模式。

AutoGen 抽象并实现了可对话代理,旨在通过代理间对话来解决任务。
可对话:AutoGen 中的代理是可对话的,这意味着任何代理都可以从其他代理发送和接收消息以发起或继续对话
可定制:AutoGen 中的代理可以定制以集成 LLM、人、工具或它们的组合。


★ PyCharm怎么打开.ipynb后缀文件 我需要详细步骤 我pycharm里面没有这个?

如果 PyCharm 版本较老或者插件不可用,可以用外部 Jupyter Notebook 运行:

  1. 安装 Jupyter

    打开 PyCharm 自带的 Terminal(终端)输入:

    pip install jupyter
    
  2. 在 Terminal 里启动 Jupyter

    jupyter notebook
    
  3. 在浏览器中打开 .ipynb Jupyter Notebook 会在浏览器中打开,你可以在其中编辑 .ipynb 文件。

AutoGen基础环境配置

AutoGen基础环境安装_autogen安装-CSDN博客

windows开始菜单搜索栏,搜索prompt,搜索结果中可以看应用Aanconda Powershell Prompt

接下来,我们将使用这个工具创建一个特定版本的Python环境。
在打开的命令行工具中输入如下命令,然后回车。

conda create -n lcy python=3.10

-n 后面是的lcy是环境的名称,相当于一个标识,后续要用这个环境时通过这个名称进行查找
python=3.10,是指定python的版本

接下来,输入如下命令,切换到创建好的环境。

conda activate lcy

接下来我们需要安装Autogen Studio。
那怎么安装呢?超级简单,在刚刚我们准备好的python环境中执行一个命令就好。

pip install autogen studio

但如果直接这样执行的话,因为它会访问国外的网站完成下载,所以速度非常慢,慢到不能忍受。所以我们需要让它去国内的镜像下载。通过参数-i指定国内镜像地址,我们使用阿里云的镜像。

上述命令就变成如下这样了。

pip install autogenstudio -i https://mirrors.aliyun.com/pypi/simple

下载之后,使用如下命令启动autogen studio服务。

★ (lcy) C:\Users\Pluminary> autogenstudio ui --port 6001

之后直接访问AutoGen Studio [Beta] → [127.0.0.1:6001] (http://127.0.0.1:6001/)

在AutoGen Studio中,Team Builder是一个功能模块,允许用户配置和管理不同的Agents来构建多智能体系统。下面是对您提到的几种Agents的详细分析:

AssistantAgent
  • 用途:AssistantAgent 是一种用于处理和生成对话的智能体。它可以用来创建聊天机器人、虚拟助手或任何需要与用户进行交互的应用程序。
  • 功能
    • 对话生成:能够基于用户的输入生成自然语言响应。
    • 任务执行:可以执行特定的任务,如信息查询、简单计算或执行预设的命令。
    • 上下文管理:保持对话的上下文,使得对话更加连贯和自然。
Web Surfer Agent
  • 用途:Web Surfer Agent 专门用于从互联网上检索信息。它是一个网络爬虫,能够访问网页并提取有用的信息。
  • 功能
    • 网页访问:能够访问指定的网页,获取内容。
    • 信息提取:从网页中提取结构化或非结构化数据。
    • 数据整合:将提取的数据整合到对话或应用程序中。
Verification Assistant
  • 用途:Verification Assistant 用于验证信息的准确性或执行某些检查任务。
  • 功能
    • 数据验证:检查数据的准确性,例如验证用户输入的信息是否符合特定的格式或标准。
    • 逻辑验证:执行逻辑检查,比如验证某个流程是否按照预定的规则执行。
    • 合规性检查:确保操作符合特定的行业或法律标准。
UserProxyAgent
  • 用途:UserProxyAgent 代表用户执行操作,通常用于模拟用户行为或自动化用户任务。
  • 功能
    • 行为模拟:模拟用户的行为,例如在测试环境中模拟用户操作。
    • 任务自动化:自动化重复性的用户任务,提高效率。
    • 隐私保护:在需要保护用户隐私的场景中,代替用户执行操作,减少个人信息泄露的风险。

在Team Builder中,这些Agents可以被配置和组合,以构建复杂的工作流程。例如,一个工作流程可能首先使用Web Surfer Agent从网络上获取信息,然后由AssistantAgent处理这些信息并生成响应,最后由Verification Assistant验证响应的准确性。通过这种方式,AutoGen Studio 提供了一个灵活且强大的平台,用于创建高效且可扩展的多智能体系统。



ConversableAgent 是一个综合性的智能对话系统,它能够提供个性化和高效的交流体验

AssistantAgent(助手代理)

想象一下你有一个智能助手,比如Siri或Alexa,AssistantAgent就像是这个助手的大脑。它的任务是理解和处理用户的请求,然后做出相应的回应。在AutoGen系统中,AssistantAgent负责接收用户的信息,分析这些信息,并根据预设的规则或逻辑来执行任务。比如,如果你告诉助手“我明天有个会议”,AssistantAgent就会帮你设置提醒或者添加到日程中。

UserProxyAgent(用户代理代理)

想象你在玩一个角色扮演游戏,UserProxyAgent就像是你的角色,它代表你在游戏世界中行动。在AutoGen系统中,UserProxyAgent代表用户与系统或其他用户进行交互。它可以是用户的替身,执行用户的命令,或者代表用户参与某些活动。比如,如果你在一个在线论坛上,UserProxyAgent可以帮你发帖或者回复别人的评论。

GroupChatManager(群聊管理器)

想象你在参加一个多人在线会议或群聊,GroupChatManager就像是会议的主持人或者群聊的管理员。它的职责是协调群聊中的所有参与者,确保信息流畅且有序地传递。在AutoGen系统中,GroupChatManager负责管理群聊的各个方面,比如邀请成员加入、监控聊天内容、维持聊天秩序等。如果有人在群聊中发了不合适的内容,GroupChatManager可能会介入处理。


在PyCharm中新建一个基于conda的工程文件
在命令行里输入: pip install pyautogen

本地启动 DeepSeek → C:\Users\Pluminary>ollama run deepseek-r1:1.5b

配置Open WebUI

可以测试模型的问答

先安装conda
conda create -n open-webui python=3.11.0
activate open-webui

# 开源的访问大模型管理页面
pip install open-webui
配置WebUI:启动
open-webui serve
------------------
http://localhost:8080
sentence_bert_config.json: 100%|████████████████████████████████████████████████████████████| 53.0/53.0 [00:00<?, ?B/s]
tokenizer_config.json: 100%|██████████████████████████████████████████████████████████████████| 350/350 [00:00<?, ?B/s]
vocab.txt: 100%|████████████████████████████████████████████████████████████████████| 232k/232k [00:00<00:00, 1.17MB/s]
tokenizer.json: 100%|███████████████████████████████████████████████████████████████| 466k/466k [00:00<00:00, 26.0MB/s]
special_tokens_map.json: 100%|████████████████████████████████████████████████████████████████| 112/112 [00:00<?, ?B/s]
INFO:     Started server process [18264]
INFO:     Waiting for application startup.
2025-02-28 08:29:21.807 | INFO     | open_webui.utils.logger:start_logger:140 - GLOBAL_LOG_LEVEL: INFO - {}

# 等看到端口号再去访问 18264

利用Python部署本地DeepSeek并进行连通

# 用于发送 HTTP 请求。在此代码中,我们使用它与本地部署的 DeepSeek 服务进行通信
import requests
# 用于处理 JSON 数据,尤其是在发送和接收 HTTP 请求时
import json


# 定义与 DeepSeek 服务交互的类
class DeepSeekChat:
    # 这个类负责与 DeepSeek 服务进行交互 它接受两个参数:
    '''
     model:模型名称,在这个案例中是 "deepseek-r1:1.5b"。
     base_url:DeepSeek 服务的 API 基础 URL,即本地部署的 DeepSeek 服务地址。
     
我们通常使用self作为第一个参数的名称。这是为了代码的可读性和一致性。这个约定使得其他阅读你代码的Python程序员能够立即识别出self代表的是类的实例。
    '''
    def __init__(self, model, base_url):
        self.model = model
        self.base_url = base_url

    # send_message 方法:该方法接收一个消息列表 messages
    # 并构建一个 POST 请求的有效负载(payload),包括模型名称和消息内容。messages 是一个包含多条消息的列表。
    def send_message(self, messages):
        # 构建请求数据
        payload = {
            "model": self.model,
            "messages": messages
        }
        # 设置请求头,指定请求体的内容类型为 application/json
        headers = {
            "Content-Type": "application/json"
        }

        # 发送请求到本地 DeepSeek 服务
 '''
使用 requests.post() 发送 HTTP POST 请求到 DeepSeek 服务的 API 地址 self.base_url
json.dumps(payload) 将请求体的负载 payload 转换为 JSON 格式的字符串。
 '''
        response = requests.post(self.base_url, headers=headers, data=json.dumps(payload))

        if response.status_code == 200:
            return response.json()  # 返回 DeepSeek 返回的响应
        else:
            return {"error": f"Request failed with status code {response.status_code}"}


# 定义初始化消息 定义启动对话函数
'''
此函数启动并管理与用户的对话。它定义了一个初始化的 messages 列表,其中包含一个系统消息,系统消息的角色是 "system",内容是 "You are a helpful assistant.",这将告诉模型其应扮演的角色
'''
def start_conversation():
    messages = [
        {"role": "system", "content": "You are a helpful assistant."}  # 系统消息定义角色
    ]

    # 初始化 DeepSeekChat 实例
    deepseek_chat = DeepSeekChat(
        model="deepseek-r1:1.5b",  # 使用的本地模型
        base_url="http://localhost:11434/v1/chat/completions"  # 本地服务 URL
    )
# 创建一个 DeepSeekChat 类的实例 deepseek_chat,并传入本地模型和服务的 URL
    while True:
        try:
            # 获取用户输入问题
            user_input = input("You: ")

            # 如果用户输入 "exit" 退出循环
            # 进入一个 while True 循环,不断等待用户输入
            #用户的每次输入都会被捕获并存储在 user_input 变量中。

            if user_input.lower() == "exit":
                print("Exiting the conversation.")
                break

            # 添加用户消息
             essages.append({"role": "user", "content": user_input})

            # 发送用户消息并获取模型回答 将用户的输入添加到 messages 列表中,消息的角色设置为 "user"。
            response = deepseek_chat.send_message(messages)

            # 检查响应是否包含错误
            if "error" in response:
                print(response["error"])
            else:
                # 打印 DeepSeek 模型的回答
                answer = response.get("choices", [{}])[0].get("message", {}).get("content", "No response")
                print("DeepSeek: " + answer)

        except KeyboardInterrupt:
            print("\nExiting the conversation due to user interrupt.")
            break
'''
if __name__ == "__main__": 语句保证只有当脚本作为主程序执行时才会调用 start_conversation 函数。如果该脚本被作为模块导入到其他脚本中,则不会执行该函数。
'''

if __name__ == "__main__":
    start_conversation()
假设你已经理解了 DeepSeek 的基础代码,下面的例子演示了如何使用 AutoGen 集成 DeepSeek:
import requests
from autogen import Agent, GroupChat, GroupChatManager
from autogen.models.openai import OpenAIChatCompletionClient
import json

# 定义与 DeepSeek 服务交互的类
class DeepSeekChat:
    def __init__(self, model, base_url):
        self.model = model
        self.base_url = base_url

    def send_message(self, messages):
        payload = {
            "model": self.model,
            "messages": messages
        }
        headers = {"Content-Type": "application/json"}
        response = requests.post(self.base_url, headers=headers, data=json.dumps(payload))
        
        if response.status_code == 200:
            return response.json()
        else:
            return {"error": f"Request failed with status code {response.status_code}"}

# 创建自定义代理(DeepSeek 代理)
class DeepSeekAgent(Agent):
    def __init__(self, model, base_url):
        super().__init__(name="DeepSeekAgent")
        self.chat = DeepSeekChat(model, base_url)

    def on_message(self, message):
        response = self.chat.send_message([{"role": "user", "content": message}])
        return response.get("choices", [{}])[0].get("message", {}).get("content", "No response")

# 创建用户代理
class UserAgent(Agent):
    def __init__(self):
        super().__init__(name="UserAgent")

    def on_message(self, message):
        return message

# 设置 DeepSeek 代理
deepseek_agent = DeepSeekAgent(model="deepseek-r1:1.5b", base_url="http://localhost:11434/v1/chat/completions")
user_agent = UserAgent()

# 创建群聊
group_chat = GroupChat(agents=[deepseek_agent, user_agent])
group_chat_manager = GroupChatManager(group_chat)

# 启动对话
def start_conversation():
    while True:
        try:
            user_input = input("You: ")
            if user_input.lower() == "exit":
                print("Exiting the conversation.")
                break
            
            response = group_chat_manager.handle_message(user_input)
            print(f"DeepSeek: {response}")
        
        except KeyboardInterrupt:
            print("\nExiting the conversation due to user interrupt.")
            break

if __name__ == "__main__":
    start_conversation()
阅读全文

python

2025/2/26

Python 基础初级

print函数输出数组
py3.x使用输出集合
list = ["a","b","c"]
print (list)
-------------------
["a","b","c"]
带引号的是同一行显示多条语句
print('hello'); print('world')
py代码块不能使用大括号 只能用缩进
if True:
    print('true')
else:
    print('false')
多行语句可以用斜杠( \ )
多行注释可以用 单引号 也可以用双引号
# 多引号
"""
111

1
"""

# 单引号
'''
1
1
1
'''
print输出是否换行 不换行就末尾加逗号 在py3之后就要用end=” “
x = "a"
y = "b"
print(x, end=" ")
print(y)
整型 浮点 字符串
counter = 100
miles = 100.0
name = "Runoob"

print(counter)
print(miles)
print(name)
加号是字符串连接运算符 型号是重复操作
str = "Hello World!"
print(str * 2)
print(str + "TEST")

list = ['sac', 123, 2.32, 'john', 70.2]
print list[2:] # 输出从第三个开始至列表末尾的所有元素
-------------------
Hello World!Hello World!
Hello World!TEST

[2.32, 'john', 70.2]
列表是有序的对象集合,字典是无序的对象集合

两者之间的区别在于:字典当中的元素是通过键来存取的,而不是通过偏移存储
字典用’{ }’标识,字典由索引(key)和它对应的值value组成

Python中的集合(Set)、元组(Tuple)和字典(Dictionary)是三种不同的数据结构,它们各自有不同的特点和用途:

  1. 集合(Set)
    • 无序的集合数据类型,其中每个元素都是唯一的,没有重复的元素。
    • 支持集合的数学运算,如并集、交集、差集等。
    • 用大括号 {} 或者 set() 函数来创建,但空集合只能用 set() 创建,因为 {} 创建的是空字典。
    • 例子:{1, 2, 3}
  2. 元组(Tuple)
    • 有序的集合数据类型,元素用圆括号 () 括起来。
    • 元组中的元素不可修改,即一旦创建,就不能更改其内容。
    • 元组可以包含不同类型的元素。
    • 例子:(1, "hello", 3.14)
  3. 字典(Dictionary)
    • 无序的键值对集合,每个键都是唯一的。
    • 键和值之间用冒号 : 分隔,多个键值对之间用逗号 , 分隔,整个字典包括在大括号 {} 中。
    • 键必须是不可变的数据类型,如字符串、数字或元组,而值可以是任何数据类型。
    • 可以通过键来访问、修改或删除对应的值。
    • 例子:{"name": "Alice", "age": 25}

总结区别:

  • 唯一性:集合和字典中的元素(对于集合是元素,对于字典是键)都是唯一的,而元组中的元素可以重复。
  • 可变性:集合和字典是可变的,可以添加、删除元素,而元组是不可变的。
  • 有序性:元组是有序的,可以索引访问;集合和字典是无序的,不能通过索引访问。
  • 用途:集合用于处理集合操作,字典用于存储键值对,元组用于存储不可变的有序元素
tinydict = {'name': 'runoob','code':6734, 'dept':'sales'}
print(tinydict)
print(tinydict.keys())
print(tinydict.values())
------------------------------------
{'name': 'runoob', 'code': 6734, 'dept': 'sales'}
dict_keys(['name', 'code', 'dept'])
dict_values(['runoob', 6734, 'sales'])
算术运算符

‘ 两个数相乘
‘%’ 返回除法的余数 — 取模
‘**’ 幂 返回x的y次幂 a * * b为10的20次方
*=’ c * * = a 等效于 c = c * * a

比较运算符

‘<>’ 不等于 类似于 !=

运算符 描述 实例
& 按位与运算符:参与运算的两个值,如果两个相应位都为1,则该位的结果为1,否则为0 (a & b) 输出结果 12 ,二进制解释: 0000 1100
| 按位或运算符:只要对应的二个二进位有一个为1时,结果位就为1。 (a | b) 输出结果 61 ,二进制解释: 0011 1101
^ 按位异或运算符:当两对应的二进位相异时,结果为1 (a ^ b) 输出结果 49 ,二进制解释: 0011 0001
~ 按位取反运算符:将二进制表示中的每一位取反,0 变为 1,1 变为 0。**~x** 类似于 -x-1 (~a ) 输出结果 -61 ,二进制解释: 1100 0011 (以补码形式表示),在一个有符号二进制数的补码形式。
<< 左移动运算符:运算数的各二进位全部左移若干位,由 << 右边的数字指定了移动的位数,高位丢弃,低位补0。 a << 2 输出结果 240 ,二进制解释: 1111 0000
>> 右移动运算符:把”>>”左边的运算数的各二进位全部右移若干位,**>>** 右边的数字指定了移动的位数 a >> 2 输出结果 15 ,二进制解释: 0000 1111
成员运算符

除了以上的一些运算符之外,Python还支持成员运算符,测试实例中包含了一系列的成员,包括字符串,列表或元组。

运算符 描述 实例
in 如果在指定的序列中找到值返回 True,否则返回 False。 x 在 y 序列中 , 如果 x 在 y 序列中返回 True。
not in 如果在指定的序列中没有找到值返回 True,否则返回 False。 x 不在 y 序列中 , 如果 x 不在 y 序列中返回 True。
a = 10
b = 20
list = [1,2,3,4,5];
if(a in list):
    print("a 在列表内")
else:
    print("a 不在列表内")
if(b not in list):
    print("b 不在列表内")
else:
    print("b 在列表内")
--------------------------
a 不在列表内
b 不在列表内
身份运算符
算符 描述 实例
is is 是判断两个标识符是不是引用自一个对象 x is y, 类似 id(x) == id(y) , 如果引用的是同一个对象则返回 True,否则返回 False
is not is not 是判断两个标识符是不是引用自不同对象 x is not y , 类似 **id(a) != id(b)**。如果引用的不是同一个对象则返回结果 True,否则返回 False。
a1 = 20
b1 = 20
if(a1 is b1):
    print("a1 与 b1 是相同的对象")
else:
    print("a1 与 b1 不是相同的对象")

if(a1 is not b1):
    print("a1 与 b1 不是相同的对象")
else:
    print("a1 与 b1 是相同的对象")
----------------------------------
a1 与 b1 是相同的对象
a1 与 b1 是相同的对象
while循环语句 + continue + break
count = 0
while(count < 9):
    print('The count is:', count)
    count = count + 1
print("Good bye!")
--------------------------------
The count is: 0
The count is: 1
The count is: 2
The count is: 3
The count is: 4
The count is: 5
The count is: 6
The count is: 7
The count is: 8
Good bye!
count = 0
while(count < 9):
    print('The count is:', count)
    count = count + 1
    print("Good bye!")
---------------------------------
The count is: 0
Good bye!
The count is: 1
Good bye!
The count is: 2
Good bye!
The count is: 3
Good bye!
The count is: 4
Good bye!
The count is: 5
Good bye!
The count is: 6
Good bye!
The count is: 7
Good bye!
The count is: 8
Good bye!

# continue 和 break 用法
 
i = 1
while i < 10:   
    i += 1
    if i%2 > 0:     # 非双数时跳过输出
        continue
    print i         # 输出双数2、4、6、8、10
 
i = 1
while 1:            # 循环条件为1必定成立
    print i         # 输出1~10
    i += 1
    if i > 10:     # 当i大于10时跳出循环
        break
for循环语句
for letter in 'Python':
    print('当前字母 : %s' %letter)

fruits = ['banana', 'apple',  'mango']
for fruit in fruits:
    print('当前水果 :', fruit)
print('Good bye!')
--------------------------------------
当前字母 : P
当前字母 : y
当前字母 : t
当前字母 : h
当前字母 : o
当前字母 : n
当前水果 : banana
当前水果 : apple
当前水果 : mango
Good bye!
循环嵌套

Python 语言允许在一个循环体里面嵌入另一个循环。

Python for 循环嵌套语法:
for iterating_var in sequence:
   for iterating_var in sequence:
      statements(s)
   statements(s)
Python while 循环嵌套语法:
while expression:
   while expression:
      statement(s)
   statement(s)
Python break 用法
for letter in 'python':
    if letter == 'h':
        break
    print('当前字母 :', letter)
------------------------------
当前字母 : p
当前字母 : y
当前字母 : t
Python pass 用法
在 Python 中有时候会看到一个 def 函数:

def sample(n_samples):
    pass

该处的 pass 便是占据一个位置,因为如果定义一个空函数程序会报错
当你没有想好函数的内容是可以用 pass 填充,使程序可以正常运行

-----------------------------------

在 Python3.x 的时候 pass 可以写或不写。
python2.x:

def function():
    # 空函数在Python2.x版本中pass是必须的
    pass
python3.x

def function():
    # 在Python3.x的时候pass可以写或不写
    pass
Python Number数字

Python Number 数据类型用于存储数值。
数据类型是不允许改变的,这就意味着如果改变 Number 数据类型的值,将重新分配内存空间。
以下实例在变量赋值时 Number 对象将被创建:

var1 = 1
var2 = 10

您也可以使用del语句删除一些 Number 对象引用。
del语句的语法是:

del var1[,var2[,var3[....,varN]]]]

您可以通过使用del语句删除单个或多个对象,例如:

del var
del var_a, var_b

Python 支持四种不同的数值类型:

  • 整型(Int) - 通常被称为是整型或整数,是正或负整数,不带小数点。

  • 长整型(long integers) - 无限大小的整数,整数最后是一个大写或小写的L。

  • 浮点型(floating point real values) - 浮点型由整数部分与小数部分组成,浮点型也可以使用科学计数法表示(2.5e2 = 2.5 x 102 = 250)

  • 复数(complex numbers) - 复数由实数部分和虚数部分构成,可以用a + bj,或者complex(a,b)表示, 复数的实部a和虚部b都是浮点型。

Python Number 类型转换
int(x [,base ])         将x转换为一个整数  
long(x [,base ])        将x转换为一个长整数  
float(x )               将x转换到一个浮点数  
complex(real [,imag ])  创建一个复数  
str(x )                 将对象 x 转换为字符串  
repr(x )                将对象 x 转换为表达式字符串  
eval(str )              用来计算在字符串中的有效Python表达式,并返回一个对象  
tuple(s )               将序列 s 转换为一个元组  
list(s )                将序列 s 转换为一个列表  
chr(x )                 将一个整数转换为一个字符  
unichr(x )              将一个整数转换为Unicode字符  
ord(x )                 将一个字符转换为它的整数值  
hex(x )                 将一个整数转换为一个十六进制字符串  
oct(x )                 将一个整数转换为一个八进制字符串  
Python math 模块、cmath 模块

Python 中数学运算常用的函数基本都在 math 模块、cmath 模块中。
Python math 模块提供了许多对浮点数的数学运算函数。
Python cmath 模块包含了一些用于复数运算的函数。
cmath 模块的函数跟 math 模块函数基本一致,区别是 cmath 模块运算的是复数
math 模块运算的是数学运算。
要使用 math 或 cmath 函数必须先导入:

查看 math 查看包中的内容:

>>> import math
>>> dir(math)
['__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'acos', 'acosh', 'asin', 'asinh', 'atan', 'atan2', 'atanh', 'ceil', 'copysign', 'cos', 'cosh', 'degrees', 'e', 'erf', 'erfc', 'exp', 'expm1', 'fabs', 'factorial', 'floor', 'fmod', 'frexp', 'fsum', 'gamma', 'gcd', 'hypot', 'inf', 'isclose', 'isfinite', 'isinf', 'isnan', 'ldexp', 'lgamma', 'log', 'log10', 'log1p', 'log2', 'modf', 'nan', 'pi', 'pow', 'radians', 'sin', 'sinh', 'sqrt', 'tan', 'tanh', 'tau', 'trunc']
>>>

下文会介绍各个函数的具体应用。

查看 cmath 查看包中的内容

>>> import cmath
>>> dir(cmath)
['__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'acos', 'acosh', 'asin', 'asinh', 'atan', 'atanh', 'cos', 'cosh', 'e', 'exp', 'inf', 'infj', 'isclose', 'isfinite', 'isinf', 'isnan', 'log', 'log10', 'nan', 'nanj', 'phase', 'pi', 'polar', 'rect', 'sin', 'sinh', 'sqrt', 'tan', 'tanh', 'tau']
>>>
Python数学函数
函数 返回值 ( 描述 )
abs(x) 返回数字的绝对值,如abs(-10) 返回 10
ceil(x) 返回数字的上入整数,如math.ceil(4.1) 返回 5
cmp(x, y) 如果 x < y 返回 -1, 如果 x == y 返回 0, 如果 x > y 返回 1
exp(x) 返回e的x次幂(ex),如math.exp(1) 返回2.718281828459045
fabs(x) 以浮点数形式返回数字的绝对值,如math.fabs(-10) 返回10.0
floor(x) 返回数字的下舍整数,如math.floor(4.9)返回 4
log(x) 如math.log(math.e)返回1.0,math.log(100,10)返回2.0
log10(x) 返回以10为基数的x的对数,如math.log10(100)返回 2.0
max(x1, x2,…) 返回给定参数的最大值,参数可以为序列。
min(x1, x2,…) 返回给定参数的最小值,参数可以为序列。
modf(x) 返回x的整数部分与小数部分,两部分的数值符号与x相同,整数部分以浮点型表示。
pow(x, y) x**y 运算后的值。
[round(x ,n]) 返回浮点数x的四舍五入值,如给出n值,则代表舍入到小数点后的位数。
sqrt(x) 返回数字x的平方根
Python随机数函数

随机数可以用于数学,游戏,安全等领域中,还经常被嵌入到算法中,用以提高算法效率,并提高程序的安全性。

Python包含以下常用随机数函数:

函数 描述
choice(seq) 从序列的元素中随机挑选一个元素,比如random.choice(range(10)),从0到9中随机挑选一个整数。
[randrange (start,] stop [,step]) 从指定范围内,按指定基数递增的集合中获取一个随机数,基数默认值为 1
random() 随机生成下一个实数,它在[0,1)范围内。
seed([x]) 改变随机数生成器的种子seed。如果你不了解其原理,你不必特别去设定seed,Python会帮你选择seed。
shuffle(lst) 将序列的所有元素随机排序
uniform(x, y) 随机生成下一个实数,它在[x,y]范围内。
Python数学常量
常量 描述
pi 数学常量 pi(圆周率,一般以π来表示)
e 数学常量 e,e即自然常数(自然常数)。
Python 字符串

字符串是 Python 中最常用的数据类型。我们可以使用引号 ( ) 来创建字符串。
创建字符串很简单,只要为变量分配一个值即可。例如:

var1 = 'Hello World!'
var2 = "Python Runoob"
Python 字符串格式化符号
符 号 描述
%c 格式化字符及其ASCII码
%s 格式化字符串
%d 格式化整数
%u 格式化无符号整型
%o 格式化无符号八进制数
%x 格式化无符号十六进制数
%X 格式化无符号十六进制数(大写)
%f 格式化浮点数字,可指定小数点后的精度
%e 用科学计数法格式化浮点数
%E 作用同%e,用科学计数法格式化浮点数
%g %f和%e的简写
%G %F 和 %E 的简写
%p 用十六进制数格式化变量的地址
格式化操作符辅助指令:
符号 功能
* 定义宽度或者小数点精度
- 用做左对齐
+ 在正数前面显示加号( + )
在正数前面显示空格
# 在八进制数前面显示零(‘0’),在十六进制前面显示’0x’或者’0X’(取决于用的是’x’还是’X’)
0 显示的数字前面填充’0’而不是默认的空格
% ‘%%’输出一个单一的’%’
(var) 映射变量(字典参数)
m.n. m 是显示的最小总宽度,n 是小数点后的位数(如果可用的话)
Python 的字符串内建函数

字符串方法是从 Python1.6 到 2.0 慢慢加进来的 —— 它们也被加到了Jython 中。

这些方法实现了 string 模块的大部分方法,如下表所示列出了目前字符串内建支持的方法,所有的方法都包含了对 Unicode 的支持,有一些甚至是专门用于 Unicode 的。

方法 描述
string.capitalize() 把字符串的第一个字符大写
string.center(width) 返回一个原字符串居中,并使用空格填充至长度 width 的新字符串
string.count(str, beg=0, end=len(string)) 返回 str 在 string 里面出现的次数,如果 beg 或者 end 指定则返回指定范围内 str 出现的次数
string.decode(encoding=’UTF-8’, errors=’strict’) 以 encoding 指定的编码格式解码 string,如果出错默认报一个 ValueError 的 异 常 , 除非 errors 指 定 的 是 ‘ignore’ 或 者’replace’
string.encode(encoding=’UTF-8’, errors=’strict’) 以 encoding 指定的编码格式编码 string,如果出错默认报一个ValueError 的异常,除非 errors 指定的是’ignore’或者’replace’
string.endswith(obj, beg=0, end=len(string)) 检查字符串是否以 obj 结束,如果beg 或者 end 指定则检查指定的范围内是否以 obj 结束,如果是,返回 True,否则返回 False.
string.expandtabs(tabsize=8) 把字符串 string 中的 tab 符号转为空格,tab 符号默认的空格数是 8。
string.find(str, beg=0, end=len(string)) 检测 str 是否包含在 string 中,如果 beg 和 end 指定范围,则检查是否包含在指定范围内,如果是返回开始的索引值,否则返回-1
string.format() 格式化字符串
string.index(str, beg=0, end=len(string)) 跟find()方法一样,只不过如果str不在 string中会报一个异常.
string.isalnum() 如果 string 至少有一个字符并且所有字符都是字母或数字则返回 True,否则返回 False
string.isalpha() 如果 string 至少有一个字符并且所有字符都是字母则返回 True,否则返回 False
string.isdecimal() 如果 string 只包含十进制数字则返回 True 否则返回 False.
string.isdigit() 如果 string 只包含数字则返回 True 否则返回 False.
string.islower() 如果 string 中包含至少一个区分大小写的字符,并且所有这些(区分大小写的)字符都是小写,则返回 True,否则返回 False
string.isnumeric() 如果 string 中只包含数字字符,则返回 True,否则返回 False
string.isspace() 如果 string 中只包含空格,则返回 True,否则返回 False.
string.istitle() 如果 string 是标题化的(见 title())则返回 True,否则返回 False
string.isupper() 如果 string 中包含至少一个区分大小写的字符,并且所有这些(区分大小写的)字符都是大写,则返回 True,否则返回 False
string.join(seq) 以 string 作为分隔符,将 seq 中所有的元素(的字符串表示)合并为一个新的字符串
string.ljust(width) 返回一个原字符串左对齐,并使用空格填充至长度 width 的新字符串
string.lower() 转换 string 中所有大写字符为小写.
string.lstrip() 截掉 string 左边的空格
string.maketrans(intab, outtab) maketrans() 方法用于创建字符映射的转换表,对于接受两个参数的最简单的调用方式,第一个参数是字符串,表示需要转换的字符,第二个参数也是字符串表示转换的目标。
max(str) 返回字符串 str 中最大的字母。
min(str) 返回字符串 str 中最小的字母。
string.partition(str) 有点像 find()和 split()的结合体,从 str 出现的第一个位置起,把 字 符 串 string 分 成 一 个 3 元 素 的 元 组 (string_pre_str,str,string_post_str),如果 string 中不包含str 则 string_pre_str == string.
string.replace(str1, str2, num=string.count(str1)) 把 string 中的 str1 替换成 str2,如果 num 指定,则替换不超过 num 次.
string.rfind(str, beg=0,end=len(string) ) 类似于 find() 函数,返回字符串最后一次出现的位置,如果没有匹配项则返回 -1。
string.rindex( str, beg=0,end=len(string)) 类似于 index(),不过是返回最后一个匹配到的子字符串的索引号。
string.rjust(width) 返回一个原字符串右对齐,并使用空格填充至长度 width 的新字符串
string.rpartition(str) 类似于 partition()函数,不过是从右边开始查找
string.rstrip() 删除 string 字符串末尾的空格.
string.split(str=””, num=string.count(str)) 以 str 为分隔符切片 string,如果 num 有指定值,则仅分隔 num+1 个子字符串
[string.splitlines(keepends]) 按照行(‘\r’, ‘\r\n’, ‘\n’)分隔,返回一个包含各行作为元素的列表,如果参数 keepends 为 False,不包含换行符,如果为 True,则保留换行符。
string.startswith(obj, beg=0,end=len(string)) 检查字符串是否是以 obj 开头,是则返回 True,否则返回 False。如果beg 和 end 指定值,则在指定范围内检查.
[string.strip(obj]) 在 string 上执行 lstrip()和 rstrip()
string.swapcase() 翻转 string 中的大小写
string.title() 返回”标题化”的 string,就是说所有单词都是以大写开始,其余字母均为小写(见 istitle())
string.translate(str, del=””) 根据 str 给出的表(包含 256 个字符)转换 string 的字符,要过滤掉的字符放到 del 参数中
string.upper() 转换 string 中的小写字母为大写
string.zfill(width) 返回长度为 width 的字符串,原字符串 string 右对齐,前面填充0
访问列表中的值 && 追加列表【append】&& 删除列表元素【del】
print("----------------")
list1 = ['physics', 'chemistry', 1997, 2000]
print(list1)
print(list1[0])
list1.append('Google')
print(list1)

del list1[4] #删除了Google
print(list1)
-------------------------------
['physics', 'chemistry', 1997, 2000]
physics
['physics', 'chemistry', 1997, 2000, 'Google']
['physics', 'chemistry', 1997, 2000]  #删除了Google
Python列表脚本操作符

列表对 + 和 * 的操作符与字符串相似。+ 号用于组合列表,* 号用于重复列表。

Python 表达式 结果 描述
len([1, 2, 3]) 3 长度
[1, 2, 3] + [4, 5, 6] [1, 2, 3, 4, 5, 6] 组合
[‘Hi!’] * 4 [‘Hi!’, ‘Hi!’, ‘Hi!’, ‘Hi!’] 重复
3 in [1, 2, 3] True 元素是否存在于列表中
for x in [1, 2, 3]: print x, 1 2 3 迭代
记得负号是倒数的噢
>>>L = ['Google', 'Runoob', 'Taobao']
>>> L[2]
'Taobao'
>>> L[-2]
'Runoob'
>>> L[1:]
['Runoob', 'Taobao']
>>>
Python 表达式 结果 描述
L[2] ‘Taobao’ 读取列表中第三个元素
L[-2] ‘Runoob’ 读取列表中倒数第二个元素
L[1:] [‘Runoob’, ‘Taobao’] 从第二个元素开始截取列表
Python列表函数&方法
序号 函数
1 cmp(list1, list2) 比较两个列表的元素
2 len(list) 列表元素个数
3 max(list) 返回列表元素最大值
4 min(list) 返回列表元素最小值
5 list(seq) 将元组转换为列表
序号 方法
1 list.append(obj) 在列表末尾添加新的对象
2 list.count(obj) 统计某个元素在列表中出现的次数
3 list.extend(seq) 在列表末尾一次性追加另一个序列中的多个值(用新列表扩展原来的列表)
4 list.index(obj) 从列表中找出某个值第一个匹配项的索引位置
5 list.insert(index, obj) 将对象插入列表
6 [list.pop(index=-1]) 移除列表中的一个元素(默认最后一个元素),并且返回该元素的值
7 list.remove(obj) 移除列表中某个值的第一个匹配项
8 list.reverse() 反向列表中元素
9 list.sort(cmp=None, key=None, reverse=False) 对原列表进行排序
Python日期和时间

Python 程序能用很多方式处理日期和时间,转换日期格式是一个常见的功能。
Python 提供了一个 time 和 calendar 模块可以用于格式化日期和时间。
时间间隔是以秒为单位的浮点小数。
每个时间戳都以自从1970年1月1日午夜(历元)经过了多长时间来表示。
Python 的 time 模块下有很多函数可以转换常见日期格式。
如函数time.time()用于获取当前时间戳, 如下实例:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
 
import time  # 引入time模块
 
ticks = time.time()
print "当前时间戳为:", ticks
--------------------------
当前时间戳为: 1459994552.51
获取某月日历

time.time( ) 返回当前时间的时间戳(1970纪元后经过的浮点秒数)
time.sleep(secs) 推迟调用线程的运行,secs指秒数。
time.localtime([secs]) 接收时间戳(1970纪元后经过的浮点秒数)并返回当地时间下的时间元组t(t.tm_isdst可取0或1,取决于当地当时是不是夏令时)。

import  calendar
cal = calendar.month(2025, 2)
print("以下输出是 2025 年 2 月的日历:")
print(cal)
------------------------------------
以下输出是 2025 年 2 月的日历:
   February 2025
Mo Tu We Th Fr Sa Su
                1  2
 3  4  5  6  7  8  9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28
Python函数

你可以定义一个由自己想要功能的函数,以下是简单的规则:

  • 函数代码块以 def 关键词开头,后接函数标识符名称和圆括号**()**。
  • 任何传入参数和自变量必须放在圆括号中间。圆括号之间可以用于定义参数。
  • 函数的第一行语句可以选择性地使用文档字符串—用于存放函数说明。
  • 函数内容以冒号起始,并且缩进。
  • return [表达式] 结束函数,选择性地返回一个值给调用方。不带表达式的return相当于返回 None。
def functionname( parameters ):
   "函数_文档字符串"
   function_suite
   return [expression]

默认情况下,参数值和参数名称是按函数声明中定义的顺序匹配起来的。

def readme(str):
    print(str)
    return
readme("Hello World!")
readme("Runoob")
----------------
Hello World!
Runoob
不定长参数

你可能需要一个函数能处理比当初声明时更多的参数。这些参数叫做不定长参数,和上述2种参数不同,声明时不会命名。基本语法如下:

def functionname([formal_args,] *var_args_tuple ):
   "函数_文档字符串"
   function_suite
   return [expression]

加了星号(*)的变量名会存放所有未命名的变量参数。不定长参数实例如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
 
# 可写函数说明
def printinfo( arg1, *vartuple ):
   "打印任何传入的参数"
   print "输出: "
   print arg1
   for var in vartuple:
      print var
   return
 
# 调用printinfo 函数
printinfo( 10 )
printinfo( 70, 60, 50 )
------------------------------
输出:
10
输出:
70
60
50
匿名函数

python 使用 lambda 来创建匿名函数。

  • lambda只是一个表达式,函数体比def简单很多。
  • lambda的主体是一个表达式,而不是一个代码块。仅仅能在lambda表达式中封装有限的逻辑进去。
  • lambda函数拥有自己的命名空间,且不能访问自有参数列表之外或全局命名空间里的参数。
  • 虽然lambda函数看起来只能写一行,却不等同于C或C++的内联函数,后者的目的是调用小函数时不占用栈内存从而增加运行效率。

lambda函数的语法只包含一个语句,如下:

lambda [arg1 [,arg2,.....argn]]:expressio

实例如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
 
# 可写函数说明
sum = lambda arg1, arg2: arg1 + arg2
 
# 调用sum函数
print "相加后的值为 : ", sum( 10, 20 )
print "相加后的值为 : ", sum( 20, 20 )
---------------------------------------
相加后的值为 :  30
相加后的值为 :  40

Python 中的 lambda 表达式是一种匿名函数,即没有名字的函数。lambda 表达式通常用于需要短小精悍函数的场景,比如排序时的键函数,或者作为高阶函数的参数。

lambda 表达式的语法如下:

lambda arguments: expression

这里的关键点如下:

  • lambda 是一个关键字,用于定义匿名函数。
  • arguments 是函数的参数,可以有多个,用逗号分隔。
  • expression 是一个表达式,其结果将作为函数的返回值。

以下是一些 lambda 表达式的使用示例:

示例 1:定义一个简单的加法函数

add = lambda x, y: x + y
print(add(5, 3))  # 输出 8

示例 2:列表排序

使用 lambda 表达式来按照列表中元素的某个属性进行排序:

# 假设有一个包含字典的列表
list_of_dicts = [{'name': 'Alice', 'age': 25}, {'name': 'Bob', 'age': 20}, {'name': 'Charlie', 'age': 30}]

# 按照年龄排序
sorted_list = sorted(list_of_dicts, key=lambda x: x['age'])
print(sorted_list)  # 输出 [{'name': 'Bob', 'age': 20}, {'name': 'Alice', 'age': 25}, {'name': 'Charlie', 'age': 30}]

示例 3:高阶函数中的使用

lambda 表达式常用于 map, filter, reduce 等高阶函数:

# 使用 map 函数将列表中的每个元素乘以2
numbers = [1, 2, 3, 4, 5]
doubled = map(lambda x: x * 2, numbers)
print(list(doubled))  # 输出 [2, 4, 6, 8, 10]

# 使用 filter 函数过滤出列表中的偶数
even_numbers = filter(lambda x: x % 2 == 0, numbers)
print(list(even_numbers))  # 输出 [2, 4]

需要注意的是,虽然 lambda 表达式非常方便,但它们通常只用于简单的函数。如果函数逻辑较为复杂,最好还是使用常规的 def 语句来定义函数,这样可以提高代码的可读性和可维护性。

return 语句

return语句[表达式]退出函数,选择性地向调用方返回一个表达式。不带参数值的return语句返回None。之前的例子都没有示范如何返回数值,下例便告诉你怎么做:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
 
# 可写函数说明
def sum( arg1, arg2 ):
   # 返回2个参数的和."
   total = arg1 + arg2
   print "函数内 : ", total
   return total
 
# 调用sum函数
total = sum( 10, 20 )
全局变量和局部变量

定义在函数内部的变量拥有一个局部作用域,定义在函数外的拥有全局作用域。

局部变量只能在其被声明的函数内部访问,而全局变量可以在整个程序范围内访问。调用函数时,所有在函数内声明的变量名称都将被加入到作用域中。如下实例:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
 
total = 0 # 这是一个全局变量
# 可写函数说明
def sum( arg1, arg2 ):
   #返回2个参数的和."
   total = arg1 + arg2 # total在这里是局部变量.
   print "函数内是局部变量 : ", total
   return total
 
#调用sum函数
sum( 10, 20 )
print "函数外是全局变量 : ", total
------------------------------------------
函数内是局部变量 :  30
函数外是全局变量 :  0
from…import 语句

Python 的 from 语句让你从模块中导入一个指定的部分到当前命名空间中。语法如下:

from modname import name1[, name2[, ... nameN]]

例如,要导入模块 fib 的 fibonacci 函数,使用如下语句:

from fib import fibonacci

这个声明不会把整个 fib 模块导入到当前的命名空间中,它只会将 fib 里的 fibonacci 单个引入到执行这个声明的模块的全局符号表。

from…import* 语句

把一个模块的所有内容全都导入到当前的命名空间也是可行的,只需使用如下声明:

from modname import *

这提供了一个简单的方法来导入一个模块中的所有项目。然而这种声明不该被过多地使用。

例如我们想一次性引入 math 模块中所有的东西,语句如下:

from math import *
globals() 和 locals() 函数

根据调用地方的不同,globals() 和 locals() 函数可被用来返回全局和局部命名空间里的名字。

如果在函数内部调用 locals(),返回的是所有能在该函数里访问的命名。

如果在函数内部调用 globals(),返回的是所有在该函数里能访问的全局名字。

两个函数的返回类型都是字典。所以名字们能用 keys() 函数摘取。


读取键盘输入

Python提供了两个内置函数从标准输入读入一行文本,默认的标准输入是键盘。如下:

  • raw_input
  • input

input函数

input([prompt]) 函数从标准输入读取一个行,并返回一个字符串(去掉结尾的换行符)

请输入:23
你输入的内容是:  23
打开和关闭文件

您已经可以向标准输入和输出进行读写。现在,来看看怎么读写实际的数据文件。
Python 提供了必要的函数和方法进行默认情况下的文件基本操作。你可以用 file 对象做大部分的文件操作

open 函数

你必须先用Python内置的open()函数打开一个文件,创建一个file对象,相关的方法才可以调用它进行读写。

语法:

file object = open(file_name [, access_mode][, buffering])

各个参数的细节如下:

  • file_name:file_name变量是一个包含了你要访问的文件名称的字符串值。
  • access_mode:access_mode决定了打开文件的模式:只读,写入,追加等。所有可取值见如下的完全列表。这个参数是非强制的,默认文件访问模式为只读(r)。
  • buffering:如果buffering的值被设为0,就不会有寄存。如果buffering的值取1,访问文件时会寄存行。如果将buffering的值设为大于1的整数,表明了这就是的寄存区的缓冲大小。如果取负值,寄存区的缓冲大小则为系统默认。

不同模式打开文件的完全列表:

模式 描述
t 文本模式 (默认)。
x 写模式,新建一个文件,如果该文件已存在则会报错。
b 二进制模式。
+ 打开一个文件进行更新(可读可写)。
U 通用换行模式(不推荐)。
r 以只读方式打开文件。文件的指针将会放在文件的开头。这是默认模式。
rb 以二进制格式打开一个文件用于只读。文件指针将会放在文件的开头。这是默认模式。一般用于非文本文件如图片等。
r+ 打开一个文件用于读写。文件指针将会放在文件的开头。
rb+ 以二进制格式打开一个文件用于读写。文件指针将会放在文件的开头。一般用于非文本文件如图片等。
w 打开一个文件只用于写入。如果该文件已存在则打开文件,并从开头开始编辑,即原有内容会被删除。如果该文件不存在,创建新文件。
wb 以二进制格式打开一个文件只用于写入。如果该文件已存在则打开文件,并从开头开始编辑,即原有内容会被删除。如果该文件不存在,创建新文件。一般用于非文本文件如图片等。
w+ 打开一个文件用于读写。如果该文件已存在则打开文件,并从开头开始编辑,即原有内容会被删除。如果该文件不存在,创建新文件。
wb+ 以二进制格式打开一个文件用于读写。如果该文件已存在则打开文件,并从开头开始编辑,即原有内容会被删除。如果该文件不存在,创建新文件。一般用于非文本文件如图片等。
a 打开一个文件用于追加。如果该文件已存在,文件指针将会放在文件的结尾。也就是说,新的内容将会被写入到已有内容之后。如果该文件不存在,创建新文件进行写入。
ab 以二进制格式打开一个文件用于追加。如果该文件已存在,文件指针将会放在文件的结尾。也就是说,新的内容将会被写入到已有内容之后。如果该文件不存在,创建新文件进行写入。
a+ 打开一个文件用于读写。如果该文件已存在,文件指针将会放在文件的结尾。文件打开时会是追加模式。如果该文件不存在,创建新文件用于读写。
ab+ 以二进制格式打开一个文件用于追加。如果该文件已存在,文件指针将会放在文件的结尾。如果该文件不存在,创建新文件用于读写
模式 r r+ w w+ a a+
+ + + +
+ + + + +
创建 + + + +
覆盖 + +
指针在开始 + + + +
指针在结尾 + +
File对象的属性

一个文件被打开后,你有一个file对象,你可以得到有关该文件的各种信息。

以下是和file对象相关的所有属性的列表:

属性 描述
file.closed 返回true如果文件已被关闭,否则返回false。
file.mode 返回被打开文件的访问模式。
file.name 返回文件的名称。
file.softspace 如果用print输出后,必须跟一个空格符,则返回false。否则返回true。

以上实例输出结果:

文件名:  foo.txt
是否已关闭 :  False
访问模式 :  w
末尾是否强制加空格 :  0
close()方法

File 对象的 close()方法刷新缓冲区里任何还没写入的信息,并关闭该文件,这之后便不能再进行写入。

当一个文件对象的引用被重新指定给另一个文件时,Python 会关闭之前的文件。用 close()方法关闭文件是一个很好的习惯。

语法:

fileObject.close()

例子:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
 
# 打开一个文件
fo = open("foo.txt", "w")
print "文件名: ", fo.name
 
# 关闭打开的文件
fo.close()

以上实例输出结果:

文件名:  foo.txt
write()方法

write()方法可将任何字符串写入一个打开的文件。需要重点注意的是,Python字符串可以是二进制数据,而不是仅仅是文字。

write()方法不会在字符串的结尾添加换行符(‘\n’):

语法:

fileObject.write(string)

在这里,被传递的参数是要写入到已打开文件的内容。

例子:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
 
# 打开一个文件
fo = open("foo.txt", "w")
fo.write( "www.runoob.com!\nVery good site!\n")
 
# 关闭打开的文件
fo.close()
read()方法

read()方法从一个打开的文件中读取一个字符串。需要重点注意的是,Python字符串可以是二进制数据,而不是仅仅是文字。

语法:

fileObject.read([count])

在这里,被传递的参数是要从已打开文件中读取的字节计数。该方法从文件的开头开始读入,如果没有传入count,它会尝试尽可能多地读取更多的内容,很可能是直到文件的末尾。

例子:

这里我们用到以上创建的 foo.txt 文件。

实例:

\#!/usr/bin/python
\# -*- coding: UTF-8 -*-

\# 打开一个文件
fo = open("foo.txt", "r+")
str = fo.read(10)
**print** "读取的字符串是 : ", str
\# 关闭打开的文件
fo.close()
---------------------
读取的字符串是 :  www.runoob
重命名和删除文件

Python的os模块提供了帮你执行文件处理操作的方法,比如重命名和删除文件。
要使用这个模块,你必须先导入它,然后才可以调用相关的各种功能。

rename() 方法

rename() 方法需要两个参数,当前的文件名和新文件名。

语法:

os.rename(current_file_name, new_file_name)

例子:

下例将重命名一个已经存在的文件test1.txt。

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import os
 
# 重命名文件test1.txt到test2.txt。
os.rename( "test1.txt", "test2.txt" )
remove()方法

你可以用remove()方法删除文件,需要提供要删除的文件名作为参数。

语法:

os.remove(file_name)

例子:

下例将删除一个已经存在的文件test2.txt。

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import os
 
# 删除一个已经存在的文件test2.txt
os.remove("test2.txt")
mkdir()方法

可以使用os模块的mkdir()方法在当前目录下创建新的目录们。你需要提供一个包含了要创建的目录名称的参数。

语法:

os.mkdir("newdir")

例子:

下例将在当前目录下创建一个新目录test。

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import os
 
# 创建目录test
os.mkdir("test")
chdir()方法

可以用chdir()方法来改变当前的目录。chdir()方法需要的一个参数是你想设成当前目录的目录名称。

语法:

os.chdir("newdir")

例子:

下例将进入”/home/newdir”目录。

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import os
 
# 将当前目录改为"/home/newdir"
os.chdir("/home/newdir")
getcwd() 方法

getcwd()方法显示当前的工作目录。

语法:

os.getcwd()

例子:

下例给出当前目录:

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import os
 
# 给出当前的目录
print os.getcwd()
rmdir()方法

rmdir()方法删除目录,目录名称以参数传递。

在删除这个目录之前,它的所有内容应该先被清除。

语法:

os.rmdir('dirname')

例子:

以下是删除” /tmp/test”目录的例子。目录的完全合规的名称必须被给出,否则会在当前目录下搜索该目录。

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import os
 
# 删除”/tmp/test”目录
os.rmdir( "/tmp/test"  )
Python File(文件) 方法
open() 方法

Python open() 方法用于打开一个文件,并返回文件对象,在对文件进行处理过程都需要使用到这个函数,如果该文件无法被打开,会抛出 OSError。

注意:使用 open() 方法一定要保证关闭文件对象,即调用 close() 方法。

open() 函数常用形式是接收两个参数:文件名(file)和模式(mode)。

open(file, mode='r')

完整的语法格式为:

open(file, mode='r', buffering=-1, encoding=None, errors=None, newline=None, closefd=True, opener=None)

参数说明:

  • file: 必需,文件路径(相对或者绝对路径)。
  • mode: 可选,文件打开模式
  • buffering: 设置缓冲
  • encoding: 一般使用utf8
  • errors: 报错级别
  • newline: 区分换行符
  • closefd: 传入的file参数类型
  • opener: 设置自定义开启器,开启器的返回值必须是一个打开的文件描述符。
file 对象

file 对象使用 open 函数来创建,下表列出了 file 对象常用的函数:

序号 方法及描述
1 file.close()关闭文件。关闭后文件不能再进行读写操作。
2 file.flush()刷新文件内部缓冲,直接把内部缓冲区的数据立刻写入文件, 而不是被动的等待输出缓冲区写入。
3 file.fileno()返回一个整型的文件描述符(file descriptor FD 整型), 可以用在如os模块的read方法等一些底层操作上。
4 file.isatty()如果文件连接到一个终端设备返回 True,否则返回 False。
5 file.next()返回文件下一行。
6 [file.read(size])从文件读取指定的字节数,如果未给定或为负则读取所有。
7 [file.readline(size])读取整行,包括 “\n” 字符。
8 [file.readlines(sizeint])读取所有行并返回列表,若给定sizeint>0,则是设置一次读多少字节,这是为了减轻读取压力。
9 [file.seek(offset, whence])设置文件当前位置
10 file.tell()返回文件当前位置。
11 [file.truncate(size])截取文件,截取的字节通过size指定,默认为当前文件位置。
12 file.write(str)将字符串写入文件,返回的是写入的字符长度。
13 file.writelines(sequence)向文件写入一个序列字符串列表,如果需要换行则要自己加入每行的换行符。
异常处理

捕捉异常可以使用try/except语句。

try/except语句用来检测try语句块中的错误,从而让except语句捕获异常信息并处理。

如果你不想在异常发生时结束你的程序,只需在try里捕获它。

语法:

以下为简单的try….except…else的语法:

try:
<语句>        #运行别的代码
except <名字>:
<语句>        #如果在try部份引发了'name'异常
except <名字>,<数据>:
<语句>        #如果引发了'name'异常,获得附加的数据
else:
<语句>        #如果没有异常发生

try的工作原理是,当开始一个try语句后,python就在当前程序的上下文中作标记,这样当异常出现时就可以回到这里,try子句先执行,接下来会发生什么依赖于执行时是否出现异常。

  • 如果当try后的语句执行时发生异常,python就跳回到try并执行第一个匹配该异常的except子句,异常处理完毕,控制流就通过整个try语句(除非在处理异常时又引发新的异常)。
  • 如果在try后的语句里发生了异常,却没有匹配的except子句,异常将被递交到上层的try,或者到程序的最上层(这样将结束程序,并打印默认的出错信息)。
  • 如果在try子句执行时没有发生异常,python将执行else语句后的语句(如果有else的话),然后控制流通过整个try语句。
#!/usr/bin/python
# -*- coding: UTF-8 -*-

try:
    fh = open("testfile", "w")
    fh.write("这是一个测试文件,用于测试异常!!")
except IOError:
    print "Error: 没有找到文件或读取文件失败"
else:
    print "内容写入文件成功"
    fh.close()
使用except而不带任何异常类型
try:
   # 正常的操作
   ......................
except:
   # 发生异常,执行这块代码
   ......................
else:
   # 如果没有异常执行这块代码

以上方式try-except语句捕获所有发生的异常。但这不是一个很好的方式,我们不能通过该程序识别出具体的异常信息。因为它捕获所有的异常。

使用except而带多种异常类型

你也可以使用相同的except语句来处理多个异常信息,如下所示:

try:
   # 正常的操作
   ......................
except(Exception1[, Exception2[,...ExceptionN]]):
   # 发生以上多个异常中的一个,执行这块代码
   ......................
else:
   # 如果没有异常执行这块代码
try-finally 语句

try-finally 语句无论是否发生异常都将执行最后的代码。

try:
<语句>
finally:
<语句>    #退出try时总会执行
raise

---------------------------
try:
    fh = open("testfile", "w")
    fh.write("这是一个测试文件,用于测试异常!!")
finally:
    print "Error: 没有找到文件或读取文件失败"

同样的例子也可以写成如下方式

#!/usr/bin/python
# -*- coding: UTF-8 -*-

try:
    fh = open("testfile", "w")
    try:
        fh.write("这是一个测试文件,用于测试异常!!")
    finally:
        print "关闭文件"
        fh.close()
except IOError:
    print "Error: 没有找到文件或读取文件失败"

当在try块中抛出一个异常,立即执行finally块代码。finally块中的所有语句执行后,异常被再次触发,并执行except块代码。参数的内容不同于异常。

触发异常

实例

一个异常可以是一个字符串,类或对象。 Python的内核提供的异常,大多数都是实例化的类,这是一个类的实例的参数。

定义一个异常非常简单,如下所示:

实例

def functionName( level ):
  if level < 1:
    raise Exception("Invalid level!", level)
    \# 触发异常后,后面的代码就不会再执行

为了能够捕获异常,”except”语句必须有用相同的异常来抛出类对象或者字符串。例如我们捕获以上异常,”except”语句如下所示:

# 触发异常后,后面的代码就不会再执行  触发异常后,后面的代码就不会再执行
try:
    正常逻辑
except Exception,err:
    触发自定义异常    
else:
    其余代码

----------------------
#!/usr/bin/python
# -*- coding: UTF-8 -*-

# 定义函数
def mye( level ):
    if level < 1:
        raise Exception,"Invalid level!"
        # 触发异常后,后面的代码就不会再执行
try:
    mye(0)            # 触发异常
except Exception,err:
    print 1,err
else:
    print 2
----------------------
$ python test.py 
1 Invalid level!
用户自定义异常

通过创建一个新的异常类,程序可以命名它们自己的异常。异常应该是典型的继承自Exception类,通过直接或间接的方式。以下为与RuntimeError相关的实例,实例中创建了一个类,基类为RuntimeError,用于在异常触发时输出更多的信息。在try语句块中,用户自定义的异常后执行except块语句,变量 e 是用于创建Networkerror类的实例。

class Networkerror(RuntimeError):
    def __init__(self, arg):
        self.args = arg

在你定义以上类后,你可以触发该异常,如下所示:

try:
    raise Networkerror("Bad hostname")
except Networkerror,e:
    print e.args
Python OS 文件/目录方法
序号 方法及描述
1 os.access(path, mode) 检验权限模式
2 os.chdir(path) 改变当前工作目录
3 os.chflags(path, flags) 设置路径的标记为数字标记。
4 os.chmod(path, mode) 更改权限
5 os.chown(path, uid, gid) 更改文件所有者
6 os.chroot(path) 改变当前进程的根目录
7 os.close(fd) 关闭文件描述符 fd
8 os.closerange(fd_low, fd_high) 关闭所有文件描述符,从 fd_low (包含) 到 fd_high (不包含), 错误会忽略
9 os.dup(fd) 复制文件描述符 fd
10 os.dup2(fd, fd2) 将一个文件描述符 fd 复制到另一个 fd2
11 os.fchdir(fd) 通过文件描述符改变当前工作目录
12 os.fchmod(fd, mode) 改变一个文件的访问权限,该文件由参数fd指定,参数mode是Unix下的文件访问权限。
13 os.fchown(fd, uid, gid) 修改一个文件的所有权,这个函数修改一个文件的用户ID和用户组ID,该文件由文件描述符fd指定。
14 os.fdatasync(fd) 强制将文件写入磁盘,该文件由文件描述符fd指定,但是不强制更新文件的状态信息。
15 [os.fdopen(fd, mode[, bufsize]]) 通过文件描述符 fd 创建一个文件对象,并返回这个文件对象
16 os.fpathconf(fd, name) 返回一个打开的文件的系统配置信息。name为检索的系统配置的值,它也许是一个定义系统值的字符串,这些名字在很多标准中指定(POSIX.1, Unix 95, Unix 98, 和其它)。
17 os.fstat(fd) 返回文件描述符fd的状态,像stat()。
18 os.fstatvfs(fd) 返回包含文件描述符fd的文件的文件系统的信息,像 statvfs()
19 os.fsync(fd) 强制将文件描述符为fd的文件写入硬盘。
20 os.ftruncate(fd, length) 裁剪文件描述符fd对应的文件, 所以它最大不能超过文件大小。
21 os.getcwd() 返回当前工作目录
22 os.getcwdu() 返回一个当前工作目录的Unicode对象
23 os.isatty(fd) 如果文件描述符fd是打开的,同时与tty(-like)设备相连,则返回true, 否则False。
24 os.lchflags(path, flags) 设置路径的标记为数字标记,类似 chflags(),但是没有软链接
25 os.lchmod(path, mode) 修改连接文件权限
26 os.lchown(path, uid, gid) 更改文件所有者,类似 chown,但是不追踪链接。
27 os.link(src, dst) 创建硬链接,名为参数 dst,指向参数 src
28 os.listdir(path) 返回path指定的文件夹包含的文件或文件夹的名字的列表。
29 os.lseek(fd, pos, how) 设置文件描述符 fd当前位置为pos, how方式修改: SEEK_SET 或者 0 设置从文件开始的计算的pos; SEEK_CUR或者 1 则从当前位置计算; os.SEEK_END或者2则从文件尾部开始. 在unix,Windows中有效
30 os.lstat(path) 像stat(),但是没有软链接
31 os.major(device) 从原始的设备号中提取设备major号码 (使用stat中的st_dev或者st_rdev field)。
32 os.makedev(major, minor) 以major和minor设备号组成一个原始设备号
33 [os.makedirs(path, mode]) 递归文件夹创建函数。像mkdir(), 但创建的所有intermediate-level文件夹需要包含子文件夹。
34 os.minor(device) 从原始的设备号中提取设备minor号码 (使用stat中的st_dev或者st_rdev field )。
35 [os.mkdir(path, mode]) 以数字mode的mode创建一个名为path的文件夹.默认的 mode 是 0777 (八进制)。
36 [os.mkfifo(path, mode]) 创建命名管道,mode 为数字,默认为 0666 (八进制)
37 [os.mknod(filename, mode=0600, device]) 创建一个名为filename文件系统节点(文件,设备特别文件或者命名pipe)。
38 [os.open(file, flags, mode]) 打开一个文件,并且设置需要的打开选项,mode参数是可选的
39 os.openpty() 打开一个新的伪终端对。返回 pty 和 tty的文件描述符。
40 os.pathconf(path, name) 返回相关文件的系统配置信息。
41 os.pipe() 创建一个管道. 返回一对文件描述符(r, w) 分别为读和写
42 [os.popen(command, mode[, bufsize]]) 从一个 command 打开一个管道
43 os.read(fd, n) 从文件描述符 fd 中读取最多 n 个字节,返回包含读取字节的字符串,文件描述符 fd对应文件已达到结尾, 返回一个空字符串。
44 os.readlink(path) 返回软链接所指向的文件
45 os.remove(path) 删除路径为path的文件。如果path 是一个文件夹,将抛出OSError; 查看下面的rmdir()删除一个 directory。
46 os.removedirs(path) 递归删除目录。
47 os.rename(src, dst) 重命名文件或目录,从 src 到 dst
48 os.renames(old, new) 递归地对目录进行更名,也可以对文件进行更名。
49 os.rmdir(path) 删除path指定的空目录,如果目录非空,则抛出一个OSError异常。
50 os.stat(path) 获取path指定的路径的信息,功能等同于C API中的stat()系统调用。
51 [os.stat_float_times(newvalue]) 决定stat_result是否以float对象显示时间戳
52 os.statvfs(path) 获取指定路径的文件系统统计信息
53 os.symlink(src, dst) 创建一个软链接
54 os.tcgetpgrp(fd) 返回与终端fd(一个由os.open()返回的打开的文件描述符)关联的进程组
55 os.tcsetpgrp(fd, pg) 设置与终端fd(一个由os.open()返回的打开的文件描述符)关联的进程组为pg。
56 [os.tempnam(dir[, prefix]]) 返回唯一的路径名用于创建临时文件。
57 os.tmpfile() 返回一个打开的模式为(w+b)的文件对象 .这文件对象没有文件夹入口,没有文件描述符,将会自动删除。
58 os.tmpnam() 为创建一个临时文件返回一个唯一的路径
59 os.ttyname(fd) 返回一个字符串,它表示与文件描述符fd 关联的终端设备。如果fd 没有与终端设备关联,则引发一个异常。
60 os.unlink(path) 删除文件
61 os.utime(path, times) 返回指定的path文件的访问和修改的时间。
62 [os.walk(top[, topdown=True[, onerror=None[, followlinks=False]]])](https://www.runoob.com/python/os-walk.html) 输出在文件夹中的文件名通过在树中游走,向上或者向下。
63 os.write(fd, str) 写入字符串到文件描述符 fd中. 返回实际写入的字符串长度
64 os.path 模块 获取文件的属性信息。
Python 内置函数
内置函数
abs() divmod() input() open() staticmethod()
all() enumerate() int() ord() str()
any() eval() isinstance() pow() sum()
basestring() execfile() issubclass() print() super()
bin() file() iter() property() tuple()
bool() filter() len() range() type()
bytearray() float() list() raw_input() unichr()
callable() format() locals() reduce() unicode()
chr() frozenset() long() reload() vars()
classmethod() getattr() map() repr() xrange()
cmp() globals() max() reverse() zip()
compile() hasattr() memoryview() round() import()
complex() hash() min() set()
delattr() help() next() setattr()
dict() hex() object() slice()
dir() id() oct() sorted() exec 内置表达式



Python高级

创建类 [面向对象]
class Employee:
    '所有员工的基本类型'
    empCount = 0

    def __init__(self, name, salary):
        self.name = name
        self.salary = salary
        Employee.empCount += 1

    def displayCount(self):
        print ("Total Employee %d" % Employee.empCount)
    def displayEmployee(self):
        print ("Name : ", self.name,  ", Salary: ", self.salary)

------------------------------------------
class Employee:  
    # 定义一个名为 Employee 的新类
    '所有员工的基本类型'  
    # 类的文档字符串,描述类的用途

    empCount = 0  
    # 类变量,用于跟踪类实例的数量

    def __init__(self, name, salary):  
        # 类的构造函数,用于创建类的新实例
        self.name = name  
        # 将参数 name 的值赋给实例变量 name
        self.salary = salary 
        # 将参数 salary 的值赋给实例变量 salary
        Employee.empCount += 1  
        # 每创建一个实例,类变量 empCount 的值就增加 1

    def displayCount(self):  
        # 类的方法,用于显示当前员工数量
        print("Total Employee %d" % Employee.empCount)              # 打印员工总数

    def displayEmployee(self):  
        # 类的方法,用于显示员工信息
        print("Name : ", self.name, ", Salary: ", self.salary)  
        # 打印员工的姓名和薪水
  • empCount 变量是一个类变量,它的值将在这个类的所有实例之间共享。你可以在内部类或外部类使用 Employee.empCount 访问。
  • 第一种方法__init__()方法是一种特殊的方法,被称为类的构造函数或初始化方法,当创建了这个类的实例时就会调用该方法
  • self 代表类的实例,self 在定义类的方法时是必须有的,虽然在调用时不必传入相应的参数。
创建实例对象

实例化类其他编程语言中一般用关键字 new,但是在 Python 中并没有这个关键字,类的实例化类似函数调用方式。以下使用类的名称 Employee 来实例化,并通过 init 方法接收参数

# 其余的都在上面的代码中
emp1 = Employee("Zara", 2000)
emp2 = Employee("Manni", 5000)
访问属性

您可以使用点号 . 来访问对象的属性。使用如下类的名称访问类变量:

emp1.displayEmployee()
emp2.displayEmployee()
print "Total Employee %d" % Employee.empCount
----------------------------------
Name :  Zara , Salary:  2000
Name :  Manni , Salary:  5000
Total Employee:2

你可以添加,删除,修改类的属性,如下所示:

emp1.age = 7  # 添加一个 'age' 属性
emp1.age = 8  # 修改 'age' 属性
del emp1.age  # 删除 'age' 属性

你也可以使用以下函数的方式来访问属性:

  • getattr(obj, name[, default]) : 访问对象的属性。
  • hasattr(obj,name) : 检查是否存在一个属性。
  • setattr(obj,name,value) : 设置一个属性。如果属性不存在,会创建一个新属性。
  • delattr(obj, name) : 删除属性。

hasattr(emp1, ‘age’) # 如果存在 ‘age’ 属性返回 True。
getattr(emp1, ‘age’) # 返回 ‘age’ 属性的值
setattr(emp1, ‘age’, 8) # 添加属性 ‘age’ 值为 8
delattr(emp1, ‘age’) # 删除属性 ‘age’

emp1.displayEmployee()
emp2.displayEmployee()
emp1.age = 7 # 添加一个 'age' 属性
print("姓名:%s" %emp1.name, "年龄:%d" %emp1.age)
print ("总计:%d" %Employee.empCount)
-----------------------------------
Name :  Zara , Salary:  2000
Name :  Manni , Salary:  5000
姓名:Zara 年龄:7
总计:2
python对象销毁(垃圾回收)

Python 使用了引用计数这一简单技术来跟踪和回收垃圾。在 Python 内部记录着所有使用中的对象各有多少引用。一个内部跟踪变量,称为一个引用计数器。当对象被创建时,就创建了一个引用计数,当这个对象不再需要时, 也就是说, 这个对象的引用计数变为0 时, 它被垃圾回收。但是回收不是”立即”的, 由解释器在适当的时机,将垃圾对象占用的内存空间回收。

a = 40      # 创建对象  <40>
b = a       # 增加引用, <40> 的计数
c = [b]     # 增加引用.  <40> 的计数

del a       # 减少引用 <40> 的计数
b = 100     # 减少引用 <40> 的计数
c[0] = -1   # 减少引用 <40> 的计数

垃圾回收机制不仅针对引用计数为0的对象,同样也可以处理循环引用的情况。循环引用指的是,两个对象相互引用,但是没有其他变量引用他们。这种情况下,仅使用引用计数是不够的。Python 的垃圾收集器实际上是一个引用计数器和一个循环垃圾收集器。作为引用计数的补充, 垃圾收集器也会留心被分配的总量很大(即未通过引用计数销毁的那些)的对象。 在这种情况下, 解释器会暂停下来, 试图清理所有未引用的循环。

类的继承

面向对象的编程带来的主要好处之一是代码的重用,实现这种重用的方法之一是通过继承机制。通过继承创建的新类称为子类派生类,被继承的类称为基类父类超类

继承语法

class 派生类名(基类名)
    ...

在python中继承中的一些特点:

  • 1、如果在子类中需要父类的构造方法就需要显式的调用父类的构造方法,或者不重写父类的构造方法。详细说明可查看: python 子类继承父类构造函数说明
  • 2、在调用基类的方法时,需要加上基类的类名前缀,且需要带上 self 参数变量。区别在于类中调用普通函数时并不需要带上 self 参数
  • 3、Python 总是首先查找对应类型的方法,如果它不能在派生类中找到对应的方法,它才开始到基类中逐个查找。(先在本类中查找调用的方法,找不到才去基类中找)。

如果在继承元组中列了一个以上的类,那么它就被称作”多重继承” 。

语法:

派生类的声明,与他们的父类类似,继承的基类列表跟在类名之后
如下所示:

class SubClassName (ParentClass1[, ParentClass2, ...]):
    ...
方法重写

如果你的父类方法的功能不能满足你的需求,你可以在子类重写你父类的方法:

class Parent:
    def myMethod(self):
        print ('调用父类方法')
class Child(Parent):
    def myMethod(self):
        print ('调用子类方法')
c = Child()
c.myMethod()
类属性与方法

类的私有属性

“_ _” → __private_attrs 两个下划线开头,声明属性为私有,不能在外部被使用或直接访问。在类内部的方法中使用时 self.__private_attrs

类的方法

在类的内部,使用def关键字可以为类定义一个方法,与一般函数定义不同,类方法必须包含参数self,且为第一个参数

类的私有方法

__private_method:两个下划线开头,声明该方法为私有方法,不能在类的外部调用。
在类的内部调用self.__private_methods

#!/usr/bin/python
# -*- coding: UTF-8 -*-
 
class JustCounter:
    __secretCount = 0  # 私有变量
    publicCount = 0    # 公开变量
 
    def count(self):
        self.__secretCount += 1
        self.publicCount += 1
        print self.__secretCount
 
counter = JustCounter()
counter.count()
counter.count()
print counter.publicCount
print counter.__secretCount  # 报错,实例不能访问私有变量
----------------------------------------------------
1
2
2
Traceback (most recent call last):
  File "test.py", line 17, in <module>
    print counter.__secretCount  # 报错,实例不能访问私有变量
AttributeError: JustCounter instance has no attribute '__secretCount'

Python不允许实例化的类访问私有数据,但你可以使用 object._className__attrName对象名._类名__私有属性名 )访问属性,参考以下实例:

#!/usr/bin/python
# -*- coding: UTF-8 -*-

class Runoob:
    __site = "www.runoob.com"

runoob = Runoob()
print runoob._Runoob__site

执行以上代码,执行结果如下:

www.runoob.com
下划线、双下划线、头尾双下划线说明:
  • _ _ foo _ _: 定义的是特殊方法,一般是系统定义名字 ,类似 init() 之类的。
  • _ foo: 以单下划线开头的表示的是 protected 类型的变量,即保护类型只能允许其本身与子类进行访问,不能用于 from module import *
  • _ _ foo: 双下划线的表示的是私有类型(private)的变量, 只能是允许这个类本身进行访问了。

正则表达

re.match() 与 re.search()的区别

在 Python 的正则表达式模块 re 中,re.match()re.search() 都用于在字符串中查找正则表达式的匹配项,但它们在查找匹配项的方式上有所不同。

re.match() 只在字符串的开始处进行匹配。如果字符串的开头与正则表达式不匹配,re.match()将返回None

import re
# 使用 re.search()
string = "hello world"
pattern = "hello"
match = re.match(pattern, string)
if match:
    print(f"匹配成功: {match.group()}")
else:
    print("匹配失败")
----------------------
匹配成功: hello

re.search() 会扫描整个字符串,直到找到第一个匹配正则表达式的子串。它在整个字符串中查找匹配项,而不仅仅是在字符串的开始处。

import re
# 使用 re.search()
string = "world hello"
pattern = "hello"

search = re.search(pattern, string)

if search:
    print(f"Match found: {search.group()}")
else:
    print("No match found.")
-------------------------------
Match found: hello
检索和替换

Python 的 re 模块提供了re.sub用于替换字符串中的匹配项。

re.sub(pattern, repl, string, count=0, flags=0)

参数:

  • pattern : 正则中的模式字符串。
  • repl : 替换的字符串,也可为一个函数。
  • string : 要被查找替换的原始字符串。
  • count : 模式匹配后替换的最大次数,默认 0 表示替换所有的匹配。

代码分析

import re

phone = "2004-959-559 # 这是一个国外电话号码"

# 删除字符串中的 Python注释 
num = re.sub(r'#.*$', "", phone)
print("电话号码是: ", num)

# 删除非数字(-)的字符串 
num = re.sub(r'\D', "", phone)
print("电话号码是 : ", num)

第一部分:删除字符串中的注释

num = re.sub(r'#.*$', "", phone)
  • re.sub() 是 re 模块中的函数,用于替换字符串中的匹配项。

  • r'#.*$'
    

    是正则表达式模式:

    • # 表示匹配字符 #
    • .* 表示匹配任意字符(.)任意次数(*)。
    • $ 表示匹配字符串的末尾。
  • "" 是替换字符串,这里是一个空字符串,表示匹配到的部分将被删除。

  • phone 是要被处理的原始字符串。

这个正则表达式 r'#.*$' 的意思是:从 # 字符开始,匹配直到字符串末尾的所有字符。因此,该行代码将删除字符串 phone 中从 # 开始到字符串末尾的所有内容。

执行结果:

复制

电话号码是:  2004-959-559 

第二部分:删除非数字字符

num = re.sub(r'\D', "", phone)
  • re.sub() 同上,用于替换字符串中的匹配项。

  • r'\D'
    

    是正则表达式模式:

    • \D 表示匹配任意非数字字符。
  • "" 同上,是一个空字符串,表示匹配到的非数字字符将被删除。

  • phone 同上,是要被处理的原始字符串。

这个正则表达式 r'\D' 的意思是:匹配字符串中所有的非数字字符。因此,该行代码将删除字符串 phone 中所有非数字字符。

电话号码是 :  2004959559 

总结

这段代码展示了如何使用 re.sub() 函数结合正则表达式来清理和格式化字符串。第一个例子中,我们移除了字符串末尾的注释。第二个例子中,我们移除了电话号码中的所有非数字字符,从而得到一个仅包含数字的字符串。这些操作在数据清洗和预处理中非常有用。

import re
string = "hello world"
pattern = "hello"
match = re.match(pattern, string)
if match:
    print(f"匹配成功: {match.group()}")
else:
    print("匹配失败")
print('-----------------')
import re
phone = "2004-959-559 # 这是一个国外电话号码"
num = re.sub("#.*$","",phone)
print("电话号码是: ", num)
num2 = re.sub(r'\D',"",phone)
print("电话号码是: ", num2)
-----------------------------------------
电话号码是:  2004-959-559 
电话号码是:  2004959559

★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★ ★

print(f"匹配成功: {match.group()}")

f-string 是 Python 3.6 及以上版本中引入的一种新的字符串格式化方法,它允许你在字符串字面量中嵌入表达式,并在运行时动态地进行求值和格式化。

  • f 告诉 Python 这是一个格式化字符串。
  • {match.group()} 是一个表达式,它将被求值,并且其结果将插入到字符串中相应的位置。
findall

在字符串中找到正则表达式所匹配的所有子串,并返回一个列表,如果有多个匹配模式,则返回元组列表,如果没有找到匹配的,则返回空列表。

注意: match 和 search 是匹配一次,findall 是匹配所有。

语法格式为:

findall(string[, pos[, endpos]])

参数:

  • string : 待匹配的字符串。
  • pos : 可选参数,指定字符串的起始位置,默认为 0。
  • endpos : 可选参数,指定字符串的结束位置,默认为字符串的长度。

查找字符串中的所有数字:

# -*- coding:UTF8 -*-
 
import re
# 这里您编译了一个正则表达式模式,该模式 r'\d+' 匹配一个或多个数字。
# \d 表示数字 [0-9],+ 表示一个或多个。
pattern = re.compile(r'\d+')   # 查找数字
# 这里您在字符串 'runoob 123 google 456' 中查找所有匹配的数字。
# 由于字符串中有两个数字序列 '123' 和 '456',所以返回一个列表 ['123', '456']
result1 = pattern.findall('runoob 123 google 456')
# 在这里,您同样查找数字,但是指定了搜索的起始位置 0 和结束位置 10。
# 这意味着搜索只会在字符串的前10个字符内进行。在这个范围内,有两个数字序列 '88' 和 '12',所以返回列表 ['88', '12']
result2 = pattern.findall('run88oob123google456', 0, 10)
 
print(result1)
print(result2)

输出结果:

['123', '456']
['88', '12']
re.finditer

和 findall 类似,在字符串中找到正则表达式所匹配的所有子串,并把它们作为一个迭代器返回。

re.finditer(pattern, string, flags=0)
参数 描述
pattern 匹配的正则表达式
string 要匹配的字符串。
flags 标志位,用于控制正则表达式的匹配方式,如:是否区分大小写,多行匹配等等。参见:正则表达式修饰符 - 可选标志
# -*- coding: UTF-8 -*-
 
import re
 
it = re.finditer(r"\d+","12a32bc43jf3") 
for match in it:
    print(match)          # 打印匹配对象的描述
    print("==========")
    print(match.group())  # 打印匹配到的字符串

下面是具体的区别:

  • 当您执行 print(match) 时,您实际上是在打印匹配对象的内存地址或者一个表示匹配对象的字符串。这个输出通常用于调试,以便快速查看对象的类型和标识。

    print(match.group()) 是用来获取匹配对象中的实际匹配到的字符串。group() 方法返回正则表达式匹配到的整个字符串。

  • print(match)

    • 打印的是匹配对象的描述,通常包括对象的类型和内存地址。
    • 对于调试目的,这有助于了解对象的身份,但不显示匹配到的文本。
  • print(match.group())

    • 打印的是通过正则表达式匹配到的具体字符串。
    • 这是获取匹配文本的实际内容的方法。
re.split

split方法按照能够匹配的子串将字符串分割后返回列表

import re
# 定义一个包含单词和数字的字符串
text = "单词1单词2单词3单词4"
# 使用 re.split() 按照数字进行分割,但不保留数字
# (?:\d+) 匹配一个或多个数字,但不会捕获匹配的数字
result = re.split(r'(?:\d+)', text)
print(result)
正则表达式模式

模式字符串使用特殊的语法来表示一个正则表达式:
字母和数字表示他们自身。一个正则表达式模式中的字母和数字匹配同样的字符串。
多数字母和数字前加一个反斜杠时会拥有不同的含义。
标点符号只有被转义时才匹配自身,否则它们表示特殊的含义。
反斜杠本身需要使用反斜杠转义。
由于正则表达式通常都包含反斜杠,所以你最好使用原始字符串来表示它们。模式元素(如 r’\t’,等价于 ‘\t’)匹配相应的特殊字符。
下表列出了正则表达式模式语法中的特殊元素。如果你使用模式的同时提供了可选的标志参数,某些模式元素的含义会改变。

模式 描述
^ 匹配字符串的开头
$ 匹配字符串的末尾。
. 匹配任意字符,除了换行符,当re.DOTALL标记被指定时,则可以匹配包括换行符的任意字符。
[…] 用来表示一组字符,单独列出:[amk] 匹配 ‘a’,’m’或’k’
[^…] 不在[]中的字符:[^abc] 匹配除了a,b,c之外的字符。
re* 匹配0个或多个的表达式。
re+ 匹配1个或多个的表达式。
re? 匹配0个或1个由前面的正则表达式定义的片段,非贪婪方式
re{ n} 精确匹配 n 个前面表达式。例如, o{2} 不能匹配 “Bob” 中的 “o”,但是能匹配 “food” 中的两个 o。
re{ n,} 匹配 n 个前面表达式。例如, o{2,} 不能匹配”Bob”中的”o”,但能匹配 “foooood”中的所有 o。”o{1,}” 等价于 “o+”。”o{0,}” 则等价于 “o*”。
re{ n, m} 匹配 n 到 m 次由前面的正则表达式定义的片段,贪婪方式
a| b 匹配a或b
(re) 对正则表达式分组并记住匹配的文本
(?imx) 正则表达式包含三种可选标志:i, m, 或 x 。只影响括号中的区域。
(?-imx) 正则表达式关闭 i, m, 或 x 可选标志。只影响括号中的区域。
(?: re) 类似 (…), 但是不表示一个组
(?imx: re) 在括号中使用i, m, 或 x 可选标志
(?-imx: re) 在括号中不使用i, m, 或 x 可选标志
(?#…) 注释.
(?= re) 前向肯定界定符。如果所含正则表达式,以 … 表示,在当前位置成功匹配时成功,否则失败。但一旦所含表达式已经尝试,匹配引擎根本没有提高;模式的剩余部分还要尝试界定符的右边。
(?! re) 前向否定界定符。与肯定界定符相反;当所含表达式不能在字符串当前位置匹配时成功
(?> re) 匹配的独立模式,省去回溯。
\w 匹配字母数字及下划线
\W 匹配非字母数字及下划线
\s 匹配任意空白字符,等价于 **[ \t\n\r\f]**。
\S 匹配任意非空字符
\d 匹配任意数字,等价于 [0-9].
\D 匹配任意非数字
\A 匹配字符串开始
\Z 匹配字符串结束,如果是存在换行,只匹配到换行前的结束字符串。
\z 匹配字符串结束
\G 匹配最后匹配完成的位置。
\b 匹配一个单词边界,也就是指单词和空格间的位置。例如, ‘er\b’ 可以匹配”never” 中的 ‘er’,但不能匹配 “verb” 中的 ‘er’。
\B 匹配非单词边界。’er\B’ 能匹配 “verb” 中的 ‘er’,但不能匹配 “never” 中的 ‘er’。
\n, \t, 等. 匹配一个换行符。匹配一个制表符。等
\1…\9 匹配第n个分组的内容。
\10 匹配第n个分组的内容,如果它经匹配。否则指的是八进制字符码的表达式
正则表达式实例

字符匹配

实例 描述
python 匹配 “python”.

字符类

实例 描述
[Pp]ython 匹配 “Python” 或 “python”
rub[ye] 匹配 “ruby” 或 “rube”
[aeiou] 匹配中括号内的任意一个字母
[0-9] 匹配任何数字。类似于 [0123456789]
[a-z] 匹配任何小写字母
[A-Z] 匹配任何大写字母
[a-zA-Z0-9] 匹配任何字母及数字
[^aeiou] 除了aeiou字母以外的所有字符
[^0-9] 匹配除了数字外的字符
特殊字符类
实例 描述
. 匹配除 “\n” 之外的任何单个字符。要匹配包括 ‘\n’ 在内的任何字符,请使用象 ‘[.\n]’ 的模式。
\d 匹配一个数字字符。等价于 [0-9]。
\D 匹配一个非数字字符。等价于 [^0-9]。
\s 匹配任何空白字符,包括空格、制表符、换页符等等。等价于 [ \f\n\r\t\v]。
\S 匹配任何非空白字符。等价于 [^ \f\n\r\t\v]。
\w 匹配包括下划线的任何单词字符。等价于’[A-Za-z0-9_]’。
\W 匹配任何非单词字符。等价于 ‘[^A-Za-z0-9_]’。
request请求的用法

以下是使用 Python 标准库中的 urllib 模块进行 HTTP GET POST 请求的简化用法。

Get请求request.get()

import requests

# 简化的 GET 请求
def simple_get(url):
    response = requests.get(url)
    return response.text

# 使用 GET 请求
url = 'http://httpbin.org/get'
response_text = simple_get(url)
print(response_text)

Post请求request.post()

import requests

# 简化的 POST 请求
def simple_post(url, data):
    response = requests.post(url, data=data)
    return response.text

# 使用 POST 请求
url = 'http://httpbin.org/post'
data = {'key1': 'value1', 'key2': 'value2'}
response_text = simple_post(url, data)
print(response_text)
SMTP发送邮件
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header


# 设置QQ邮箱的SMTP服务器地址和端口
smtp_server = 'smtp.qq.com'
smtp_port = 465  # QQ邮箱SMTP服务器端口为465或587

# 设置发件人和收件人邮箱
sender_email = '390415032@qq.com'  # 发件人邮箱
receiver_email = 'recipient@example.com'  # 收件人邮箱
password = 'your_qq_email_auth_code'  # QQ邮箱授权码

# 设置邮件主题和内容
subject = 'Test Email from Python'
body = 'This is a test email sent by Python.'

# 创建MIMEText对象,设置邮件内容和编码
msg = MIMEText(body, 'plain', 'utf-8')
msg['From'] = Header(sender_email, 'utf-8')
msg['To'] = Header(receiver_email, 'utf-8')
msg['Subject'] = Header(subject, 'utf-8')

# 发送邮件
try:
    # 创建SMTP对象
    server = smtplib.SMTP_SSL(smtp_server, smtp_port)
    # 登录SMTP服务器
    server.login(sender_email, password)
    # 发送邮件
    server.sendmail(sender_email, [receiver_email], msg.as_string())
    print('Email sent successfully!')
except smtplib.SMTPException as e:
    print('Error: unable to send email. ', e)
finally:
    # 关闭服务器连接
    server.quit()
Python 多线程

Python中使用线程有两种方式:函数或者用类来包装线程对象。

函数式:调用thread模块中的start_new_thread()函数来产生新线程。语法如下:

thread.start_new_thread ( function, args[, kwargs] )

参数说明:

  • function - 线程函数。
  • args - 传递给线程函数的参数,他必须是个tuple类型。
  • kwargs - 可选参数。
#!/usr/bin/python
# -*- coding: UTF-8 -*-
 
import thread
import time
 
# 为线程定义一个函数
def print_time( threadName, delay):
   count = 0
   while count < 5:
      time.sleep(delay)
      count += 1
      print "%s: %s" % ( threadName, time.ctime(time.time()) )
 
# 创建两个线程
try:
   thread.start_new_thread( print_time, ("Thread-1", 2, ) )
   thread.start_new_thread( print_time, ("Thread-2", 4, ) )
except:
   print "Error: unable to start thread"
 
while 1:
   pass

-------------------------------------------------------
Thread-1: Thu Jan 22 15:42:17 2009
Thread-1: Thu Jan 22 15:42:19 2009
Thread-2: Thu Jan 22 15:42:19 2009
Thread-1: Thu Jan 22 15:42:21 2009
Thread-2: Thu Jan 22 15:42:23 2009
Thread-1: Thu Jan 22 15:42:23 2009
Thread-1: Thu Jan 22 15:42:25 2009
Thread-2: Thu Jan 22 15:42:27 2009
Thread-2: Thu Jan 22 15:42:31 2009
Thread-2: Thu Jan 22 15:42:35 2009

线程的结束一般依靠线程函数的自然结束;也可以在线程函数中调用thread.exit(),他抛出SystemExit exception,达到退出线程的目的。

线程模块

Python通过两个标准库thread和threading提供对线程的支持。thread提供了低级别的、原始的线程以及一个简单的锁。

threading 模块提供的其他方法:

  • threading.currentThread(): 返回当前的线程变量。
  • threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:

  • run(): 用以表示线程活动的方法。
  • start(): 启动线程活动。
  • join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
  • isAlive(): 返回线程是否活动的。
  • getName(): 返回线程名。
  • setName(): 设置线程名。
使用Threading模块创建线程

使用Threading模块创建线程,直接从threading.Thread继承
然后重写__init__方法和run方法:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
 
import threading
import time
 
exitFlag = 0
 
class myThread (threading.Thread):   #继承父类threading.Thread
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter
    def run(self):                   #把要执行的代码写到run函数里面 线程在创建后会直接运行run函数 
        print "Starting " + self.name
        print_time(self.name, self.counter, 5)
        print "Exiting " + self.name
 
def print_time(threadName, delay, counter):
    while counter:
        if exitFlag:
            (threading.Thread).exit()
        time.sleep(delay)
        print "%s: %s" % (threadName, time.ctime(time.time()))
        counter -= 1
 
# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
 
# 开启线程
thread1.start()
thread2.start()
 
print "Exiting Main Thread"

------------------------------------------
Starting Thread-1
Starting Thread-2
Exiting Main Thread
Thread-1: Thu Mar 21 09:10:03 2013
Thread-1: Thu Mar 21 09:10:04 2013
Thread-2: Thu Mar 21 09:10:04 2013
Thread-1: Thu Mar 21 09:10:05 2013
Thread-1: Thu Mar 21 09:10:06 2013
Thread-2: Thu Mar 21 09:10:06 2013
Thread-1: Thu Mar 21 09:10:07 2013
Exiting Thread-1
Thread-2: Thu Mar 21 09:10:08 2013
Thread-2: Thu Mar 21 09:10:10 2013
Thread-2: Thu Mar 21 09:10:12 2013
Exiting Thread-2
线程同步

如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。

使用Thread对象的Lock和Rlock可以实现简单的线程同步,这两个对象都有acquire方法release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间。如下:

多线程的优势在于可以同时运行多个任务(至少感觉起来是这样)。但是当线程需要共享数据时,可能存在数据不同步的问题。

考虑这样一种情况:一个列表里所有元素都是0,线程”set”从后向前把所有元素改成1,而线程”print”负责从前往后读取列表并打印。

那么,可能线程”set”开始改的时候,线程”print”便来打印列表了,输出就成了一半0一半1,这就是数据的不同步。为了避免这种情况,引入了锁的概念。

锁有两种状态——锁定和未锁定。每当一个线程比如”set”要访问共享数据时,必须先获得锁定;如果已经有别的线程比如”print”获得锁定了,那么就让线程”set”暂停,也就是同步阻塞;等到线程”print”访问完毕,释放锁以后,再让线程”set”继续。

经过这样的处理,打印列表时要么全部输出0,要么全部输出1,不会再出现一半0一半1的尴尬场面。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
 
import threading
import time
 
class myThread (threading.Thread):
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter
    def run(self):
        print "Starting " + self.name
       # 获得锁,成功获得锁定后返回True
       # 可选的timeout参数不填时将一直阻塞直到获得锁定
       # 否则超时后将返回False
        threadLock.acquire()
        print_time(self.name, self.counter, 3)
        # 释放锁
        threadLock.release()
 
def print_time(threadName, delay, counter):
    while counter:
        time.sleep(delay)
        print "%s: %s" % (threadName, time.ctime(time.time()))
        counter -= 1
 
threadLock = threading.Lock()
threads = []
 
# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
 
# 开启新线程
thread1.start()
thread2.start()
 
# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)
 
# 等待所有线程完成
for t in threads:
    t.join()
print "Exiting Main Thread"
线程优先级队列( Queue)

Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。

Queue模块中的常用方法:

  • Queue.qsize() 返回队列的大小
  • Queue.empty() 如果队列为空,返回True,反之False
  • Queue.full() 如果队列满了,返回True,反之False
  • Queue.full 与 maxsize 大小对应
  • Queue.get([block[, timeout]])获取队列,timeout等待时间
  • Queue.get_nowait() 相当Queue.get(False)
  • Queue.put(item, block=True, timeout=None) 写入队列,timeout等待时间
  • Queue.put_nowait(item) 相当 Queue.put(item, False)
  • Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
  • Queue.join() 实际上意味着等到队列为空,再执行别的操作
#!/usr/bin/python
# -*- coding: UTF-8 -*-
 
import Queue
import threading
import time
 
exitFlag = 0
 
class myThread (threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q
    def run(self):
        print "Starting " + self.name
        process_data(self.name, self.q)
        print "Exiting " + self.name
 
def process_data(threadName, q):
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            data = q.get()
            queueLock.release()
            print "%s processing %s" % (threadName, data)
        else:
            queueLock.release()
        time.sleep(1)
 
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1
 
# 创建新线程
for tName in threadList:
    thread = myThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1
 
# 填充队列
queueLock.acquire()
for word in nameList:
    workQueue.put(word)
queueLock.release()
 
# 等待队列清空
while not workQueue.empty():
    pass
 
# 通知线程是时候退出
exitFlag = 1
 
# 等待所有线程完成
for t in threads:
    t.join()
print "Exiting Main Thread"
---------------------------------------------------
Starting Thread-1
Starting Thread-2
Starting Thread-3
Thread-1 processing One
Thread-2 processing Two
Thread-3 processing Three
Thread-1 processing Four
Thread-2 processing Five
Exiting Thread-3
Exiting Thread-1
Exiting Thread-2
Exiting Main Thread

Python XML解析

XML 指可扩展标记语言(eXtensible Markup Language);XML 被设计用来传输和存储数据。XML 是一套定义语义标记的规则,这些标记将文档分成许多部件并对这些部件加以标识。它也是元标记语言,即定义了用于定义其他与特定领域有关的、语义的、结构化的标记语言的句法语言。常见的 XML 编程接口有 DOM 和 SAX,这两种接口处理 XML 文件的方式不同,当然使用场合也不同。
Python 有三种方法解析 XML,SAX,DOM,以及 ElementTree:

1.SAX (simple API for XML )

Python 标准库包含 SAX 解析器,SAX 用事件驱动模型,通过在解析XML的过程中触发一个个的事件并调用用户定义的回调函数来处理XML文件。

2.DOM(Document Object Model)

将 XML 数据在内存中解析成一个树,通过对树的操作来操作XML。

3.ElementTree(元素树)

ElementTree就像一个轻量级的DOM,具有方便友好的API。代码可用性好,速度快,消耗内存少。

注:因DOM需要将XML数据映射到内存中的树,一是比较慢,二是比较耗内存,而SAX流式读取XML文件,比较快,占用内存少,但需要用户实现回调函数(handler)。

XML解析一下

import xml.sax
from lxml import etree

# 确保文件路径正确
xml_file_path = 'movies.xml'

# 使用 lxml 创建一个 XML 解析器
parser = etree.XMLParser()

# 解析 XML 文件
try:
    tree = etree.parse(xml_file_path, parser)
    # 遍历所有元素并打印
    for element in tree.iter():
        print(f"Element tag: {element.tag}, text: {element.text.strip() if element.text else ''}")
except etree.XMLSyntaxError as e:
    print(f"XML 解析错误: {e}")
movies.xml
<collection shelf="New Arrivals">
<movie title="Enemy Behind">
   <type>War, Thriller</type>
   <format>DVD</format>
   <year>2003</year>
   <rating>PG</rating>
   <stars>10</stars>
   <description>Talk about a US-Japan war</description>
</movie>
<movie title="Transformers">
   <type>Anime, Science Fiction</type>
   <format>DVD</format>
   <year>1989</year>
   <rating>R</rating>
   <stars>8</stars>
   <description>A schientific fiction</description>
</movie>
   <movie title="Trigun">
   <type>Anime, Action</type>
   <format>DVD</format>
   <episodes>4</episodes>
   <rating>PG</rating>
   <stars>10</stars>
   <description>Vash the Stampede!</description>
</movie>
<movie title="Ishtar">
   <type>Comedy</type>
   <format>VHS</format>
   <rating>PG</rating>
   <stars>2</stars>
   <description>Viewable boredom</description>
</movie>
</collection>
阅读全文

LlamaIndex

2025/2/25

LlamaIndex喂食给AI并进化升级

将您的企业数据转化为可用于生产的LLM应用程序

LLM 提供人与数据之间的自然语言接口。LLM 预先训练过大量公开数据,但它们并非基于您的数据进行训练。您的数据可能是私有的,也可能是特定于您要解决的问题的数据。它隐藏在 AP1后面、SQL 数据库中,或隐藏在PDF 和幻灯片中。上下文增强使 LLM 可以使用您的数据来解决手头的问题。Llamalndex 提供构建任何上下文增强用例的工具,从原型到生产我们的工具允许您提取、解析、索引和处理您的数据,并快速实施将数据访问与 LLM 提示相结合的复杂查询工作流。
上下文增强最流行的示例是检索增强生成RAG,它在推理时将上下文与ILLM 相结合。

LlamaIndex 是一个将大语言模型(Large Language Models, LLMs,后简称大模型)和外部数据连接在一起的工具。大模型依靠上下文学习(Context Learning)来推理知识,针对一个输入(或者是prompt),根据其输出结果。因此Prompt的质量很大程度上决定了输出结果的质量,因此提示工程(Prompt engineering)现在也很受欢迎。目前大模型的输入输出长度因模型结构、显卡算力等因素影响,都有一个长度限制(以Token为单位,ChatGPT限制长度为4k个,GPT-4是32k等,Claude最新版有个100k的)。当我们外部知识的内容超过这个长度时,就无法同时将有效的信息传递给大模型。因此就诞生了 LlamaIndex 等项目。

假设有一个10w的外部数据,我们的原始输入Prompt长度为100,长度限制为4k,通过查询-检索的方式,我们能将最有效的信息提取集中在这4k的长度中,与Prompt一起送给大模型,从而让大模型得到更多的信息。此外,还能通过多轮对话的方式不断提纯外部数据,达到在有限的输入长度限制下,传达更多的信息给大模型。这部分知识可参考

LLamaIndex的任务是通过查询、检索的方式挖掘外部数据的信息,并将其传递给大模型,因此其主要由x部分组成:

  1. 数据连接。首先将数据能读取进来,这样才能挖掘。

  2. 索引构建。要查询外部数据,就必须先构建可以查询的索引,llamdaIndex将数据存储在Node中,并基于Node构建索引。索引类型包括向量索引列表索引树形索引等;

  3. 查询接口。有了索引,就必须提供查询索引的接口。通过这些接口用户可以与不同的 大模型进行对话,也能自定义需要的Prompt组合方式。查询接口会完成 检索+对话的功能,即先基于索引进行检索,再将检索结果和之前的输入Prompt进行(自定义)组合形成新的扩充Prompt,对话大模型并拿到结果进行解析。

1 数据连接器(Data Connectors)

数据连接器,读取文档的工具,最简单的就是读取本地文件。 LLamaIndex 的数据连接器包括

  • 本地文件、Notion、Google 文档、Slack、Discord

具体可参考Data Connectors。

2 索引结构(Index Structures)

LlamaIndex 的核心其实就是 索引结构的集合,用户可以使用索引结构或基于这些索引结构自行建图。

2.1 索引如何工作

两个概念:

  • Node(节点):即一段文本(Chunk of Text),LlamaIndex读取文档(documents)对象,并将其解析/划分(parse/chunk)成 Node 节点对象,构建起索引。
  • Response Synthesis(回复合成):LlamaIndex 进行检索节点并响应回复合成,不同的模式有不同的响应模式(比如向量查询、树形查询就不同),合成不同的扩充Prompt。

索引方式包括

  • List Index:Node顺序存储,可用关键字过滤Node
  • Vector Store Index:每个Node一个向量,查询的时候取top-k相似
  • Tree Index:树形Node,从树根向叶子查询,可单边查询,或者双边查询合并。
  • **Keyword Table Index**:每个Node有很多个Keywords链接,通过查Keyword能查询对应Node。

不同的索引方式决定了Query选择Node方式的不同。

回复合成方式包括:

  • 创建并提纯(Create and Refine),即线性依次迭代;

  • 树形总结(Tree Summarize):自底向上,两两合并,最终合并成一个回复。

3 查询接口(Query Inference)

3.1 LlamaIndex 使用模板

LlamaIndex 常用使用模版:

  1. 读取文档 (手动添加or通过Loader自动添加);
  2. 将文档解析为Nodes;
  3. 构建索引(从文档or从Nodes,如果从文档,则对应函数内部会完成第2步的Node解析)
  4. [可选,进阶] 在其他索引上构建索引,即多级索引结构
  5. 查询索引并对话大模型

LangChain vs LlamaIndex

综合构建用于生产的高性能RAG程序 就用LlamaIndex
LlamaIndex官网参考:https://www.llamaindex.ai/
python文档参考:[LlamaIndex - LlamaIndex] (https://docs.llamaindex.ai/en/stable/)

LLM官网最下方的入门项目也需要学习
入门项目

RAG Work Flow

构建RAG管道—加载数据(镊取)

在您选择的 LLM 可以处理您的数据之前,您首先需要处理数据并加载数据。这与 ML 领域的数据清理/特征工程管道或传统数据设置中的 ETL 管道有相似之处。

此引入管道通常包括三个主要阶段:

  1. 加载数据
  2. 转换数据
  3. 索引和存储数据

我们将在后面的章节中介绍索引 / 存储。在本指南中,我们将主要讨论 loader 和 transformations。

装载机

在您选择的 LLM 可以处理您的数据之前,您需要加载它。LlamaIndex 执行此作的方式是通过数据连接器(也称为 .Data Connector 从不同的数据源摄取数据并将数据格式化为对象。A 是有关该数据的数据(当前为文本,将来为图像音频)和元数据的集合。Reader Document Document

使用 SimpleDirectoryReader 加载

最容易使用的阅读器是我们的 SimpleDirectoryReader,它从给定目录中的每个文件创建文档。它内置于 LlamaIndex 中,可以读取多种格式,包括 Markdown、PDF、Word 文档、PowerPoint 幻灯片、图像、音频和视频

from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader("./data").load_data()

★ ★ ★ ★ 更多教程请看官方接口文档 Loading Data (Ingestion) - LlamaIndex ★ ★ ★ ★

Starter Tutorial - LlamaIndex + 欢迎使用 Colaboratory - Colab = 在线使用代码

RAG

RAG,也称为检索增强生成,是利用个人或私域数据增强 LLM 的一种范式。通常,它包含两个阶段:

  1. 索引

    构建知识库。

  2. 查询

    从知识库检索相关上下文信息,以辅助 LLM 回答问题。

LlamaIndex 提供了工具包帮助开发者极其便捷地完成这两个阶段的工作。

索引阶段

LlamaIndex 通过提供 Data connectors(数据连接器) 和 Indexes (索引) 帮助开发者构建知识库。

该阶段会用到如下工具或组件:

  • Data connectors

    数据连接器。它负责将来自不同数据源的不同格式的数据注入,并转换为 LlamaIndex 支持的文档(Document)表现形式,其中包含了文本和元数据。

  • Documents / Nodes

    Document是 LlamaIndex 中容器的概念,它可以包含任何数据源,包括,PDF文档,API响应,或来自数据库的数据。

    Node是 LlamaIndex 中数据的最小单元,代表了一个 Document的分块。它还包含了元数据,以及与其他Node的关系信息。这使得更精确的检索操作成为可能。

  • Data Indexes

    LlamaIndex 提供便利的工具,帮助开发者为注入的数据建立索引,使得未来的检索简单而高效。

    最常用的索引是向量存储索引 - VectorStoreIndex

查询阶段

在查询阶段,RAG 管道根据的用户查询,检索最相关的上下文,并将其与查询一起,传递给 LLM,以合成响应。这使 LLM 能够获得不在其原始训练数据中的最新知识,同时也减少了虚构内容。该阶段的关键挑战在于检索、编排和基于知识库的推理。

LlamaIndex 提供可组合的模块,帮助开发者构建和集成 RAG 管道,用于问答、聊天机器人或作为代理的一部分。这些构建块可以根据排名偏好进行定制,并组合起来,以结构化的方式基于多个知识库进行推理。

该阶段的构建块包括:

  • Retrievers

    检索器。它定义如何高效地从知识库,基于查询,检索相关上下文信息。

  • Node Postprocessors

    Node后处理器。它对一系列文档节点(Node)实施转换,过滤,或排名。

  • Response Synthesizers

    响应合成器。它基于用户的查询,和一组检索到的文本块(形成上下文),利用 LLM 生成响应。

RAG管道包括:

  • Query Engines

    查询引擎 - 端到端的管道,允许用户基于知识库,以自然语言提问,并获得回答,以及相关的上下文。

  • Chat Engines

    聊天引擎 - 端到端的管道,允许用户基于知识库进行对话(多次交互,会话历史)。

  • Agents

    代理。它是一种由 LLM 驱动的自动化决策器。代理可以像查询引擎或聊天引擎一样使用。主要区别在于,代理动态地决定最佳的动作序列,而不是遵循预定的逻辑。这为其提供了处理更复杂任务的额外灵活性。

LlamaIndex个性化配置

LlamaIndexRAG 过程提供了全面的配置支持,允许开发者对整个过程进行个性化设置。常见的配置场景包括:

  • 自定义文档分块
  • 自定义向量存储
  • 自定义检索
  • 指定 LLM
  • 指定响应模式
  • 指定流式响应

注,个性化配置主要通过 LlamaIndex 提供的 ServiceContext 类实现。

配置场景示例

接下来通过简明示例代码段展示 LlamaIndex 对各种配置场景的支持。

自定义文档分块
from llama_index import ServiceContext
service_context = ServiceContext.from_defaults(chunk_size=500)
自定义向量存储
import chromadb
from llama_index.vector_stores import ChromaVectorStore
from llama_index import StorageContext

chroma_client = chromadb.PersistentClient()
chroma_collection = chroma_client.create_collection("quickstart")
# 向量存储
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
自定义检索

自定义检索中,我们可以通过参数指定查询引擎(Query Engine)在检索时请求的相似文档数。

index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine(similarity_top_k=5)
指定 LLM
# 指定大语言模型
service_context = ServiceContext.from_defaults(llm=OpenAI())
指定响应模式
query_engine = index.as_query_engine(response_mode='tree_summarize')
指定流式响应
# 流式响应
query_engine = index.as_query_engine(streaming=True)
完整实例

GitHubLlamaIndex-Tutorials/03_Customization/03_Customization.ipynb at main · sugarforever/LlamaIndex-Tutorials
Colab03_Customization.ipynb - Colab

请参考03_Customization.ipynb ,这是一个基于第1课的示例实现上述的所有个性化配置:

  1. 文档分块大小:500
  2. Chromadb作为向量存储
  3. 自定义检索文档数为5
  4. 指定大模型为OpenAI的模型
  5. 响应模式为 tree_summarize
  6. 问答实现流式响应

注,响应模式会在后续课程中详细介绍。


强大的数据连接器

开源教程LlamaIndex-Tutorials/04_Data_Connectors at main · sugarforever/LlamaIndex-Tutorials
演示实例04_Data_Connectors.ipynb - Colab

文档与节点

开源教程LlamaIndex-Tutorials/05_Documents_Nodes at main · sugarforever/LlamaIndex-Tutorials

LlamaPack 新手入门

LlamaPacks - LlamaIndex

阅读全文
头像
Asuna
You are the one who can always get to me even with screen between us.