Archive

Posts Tagged ‘Python’

FastAPI 使用JWT认证的中间件

May 25th, 2020 No comments

FastAPI 使用JWT认证的中间件
fastapi的中间件还是太少,单独开发JWT需要,starlette本身提供认证相关实现,只需要自定义一个AuthenticationBackend即可,本次我们实现使用中间价方式拆包JWT的令牌,获取payload里面的用户信息

私有定义的payload内容格式如下

{
"usid": "SkDQBhEjUfygRSeEBech", //UUID Short
"uname": "test user name",  //Username
"mid":"700010001" // Member ID
}

调用代码

app = FastAPI()
app.add_middleware(AuthenticationMiddleware,backend=JWTAuthenticationBackend(secret_key="YOUR_SECRET_KEY"))

完整的代码

import jwt
 
from starlette.authentication import (
    AuthenticationBackend, AuthenticationError, BaseUser, AuthCredentials,
    UnauthenticatedUser )
 
 
class JWTUser(BaseUser):
    def __init__(self, user_id_short: str, member_number: str, user_name: str,token: str, payload: dict) -> None:
        self.user_name = user_name
        self.user_id_short = user_id_short
        self.member_number = member_number
        self.token = token
        self.payload = payload
 
    @property
    def is_authenticated(self) -> bool:
        return True
 
    @property
    def display_name(self) -> str:
        return self.user_name   ## 会员名处理,加*?
 
    @property
    def display_user_id_short(self) -> str:
        return self.user_id_short   ## 会员id处理
 
    @property
    def display_member_number(self) -> str:
        return self.member_number  ## 会员号处理,加*?
 
 
class JWTAuthenticationBackend(AuthenticationBackend):
 
    def __init__(self, secret_key: str, algorithm: str = 'HS256', prefix: str = 'JWT', user_name_field:str = 'uname' , user_id_field: str = 'usid',member_number_field: str = 'mid'):
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.prefix = prefix
        self.user_name_field = user_name_field
        self.user_id_field = user_id_field
        self.member_number_field = member_number_field
 
 
    @classmethod
    def get_token_from_header(cls, authorization: str, prefix: str):
        """
        Parses the Authorization header and returns only the token
        :param authorization:
        :return:
        """
        try:
            scheme, token = authorization.split()
        except ValueError:
            raise AuthenticationError('Could not separate Authorization scheme and token')
        if scheme.lower() != prefix.lower():
            raise AuthenticationError(f'Authorization scheme {scheme} is not supported')
        return token
 
    async def authenticate(self, request):
        if "Authorization" not in request.headers:
            return None
 
        auth = request.headers["Authorization"]
        token = self.get_token_from_header(authorization=auth, prefix=self.prefix)
        try:
            payload = jwt.decode(token, key=self.secret_key, algorithms=self.algorithm)
        except jwt.InvalidTokenError as e:
            raise AuthenticationError(str(e))
 
        return AuthCredentials(["jwt_authenticated"]), JWTUser(user_id_short=payload[self.user_id_field], member_number=payload[self.member_number_field],user_name=payload[self.user_name_field],token=token,
                                                           payload=payload)
 
 
class JWTWebSocketAuthenticationBackend(AuthenticationBackend):
 
    def __init__(self, secret_key: str, algorithm: str = 'HS256', query_param_name: str = 'jwt',
                 user_name_field:str = 'uname' , user_id_field: str = 'usid',member_number_field: str = 'mid'):
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.query_param_name = query_param_name
        self.user_name_field = user_name_field
        self.user_id_field = user_id_field
        self.member_number_field = member_number_field
 
    async def authenticate(self, request):
        if self.query_param_name not in request.query_params:
            return AuthCredentials(), UnauthenticatedUser()
 
        token = request.query_params[self.query_param_name]
 
        try:
            payload = jwt.decode(token, key=self.secret_key, algorithms=self.algorithm)
        except jwt.InvalidTokenError as e:
            raise AuthenticationError(str(e))
 
        return AuthCredentials(["jwt_authenticated"]), JWTUser(user_id_short=payload[self.user_id_field], member_number=payload[self.member_number_field],user_name=payload[self.user_name_field],token=token,
                                                           payload=payload)
Categories: 语言编程 Tags: , ,

nohup 后台运行python程序print无输出

July 26th, 2019 No comments

使用nohup后台运行python,print没有输出到日志

nohup python foobar.py > foobar.log 2>&1 &

发现foobar.log中显示不出来python程序中print的东西。
这是因为python的输出有缓冲,导致foobar.log并不能够马上看到输出。
python 有个-u参数,使得python不启用缓冲。

nohup python -u foobar.py > foobar.log 2>&1 &

其他玩法:
只输出错误到日志
# Only error write to log
nohup python -u ./foobar.py> /dev/null 2>foobar.log &
不输出到日志
# Nothing to Display
nohup python -u ./foobar.py> /dev/null 2>&1 &
全部print输出到日志
# Write all to log
nohup python -u ./foobar.py> ./foobar.log 2>&1 &

https://blog.csdn.net/Statham_stone/article/details/78290813

Categories: 系统管理 Tags:

[Python]PostgreSQL字典/JSON类型递归自展开

July 15th, 2019 No comments

PostgreSql 习惯上会将特殊数据类型的各个节点按字典/JSON类型存储
程序中需要获得完整的数据信息的时候,需要对这个节点进行自展开。

以下使用global id方式进行展开,一般适用于SQL+NoSQL结合的系统使用

import sys, os
import numpy as np
 
def get_object_by_gid(id):
	for dict in data["json"]:
		if dict["gid"] == id:
	return dict.copy()
 
def self_exact_node(key):
	dict = get_object_by_gid(key)
	for k,v in dict.items():
		if k == "sub_item" :
			item_arr = []
			for id in v["gids"]:
				item_arr.append(self_exact_node(id))
			v["item_arr"] = item_arr.copy()
	return dict
 
def demo():
	data_exact = data.copy()
	for d in data_exact["json"]:
		d = self_exact_node(d["gid"])
 
def main():
	demo()
 
if __name__ == '__main__’:
	sys.exit(main())

其他玩法

# Global ID方式
"gid": "大分类2:中分类2:子分类2",
# 复合ID
"id":{"l1_cat":"大分类1","l2_cat":"中分类2","l3_cat":"子分类2"}
# 链表式
"chain":{"next_gid":"","pre_gid":"","head_gid":"","tail_gid":""}
# 二叉树式
"btree":{"next_sibling":"","child":”"}

Read more…

Categories: 语言编程 Tags: , , ,

[Python]使用OpenCV实现伪彩色和热力图

July 15th, 2019 No comments

使用applyColorMap可以对单个通道的图像进行伪彩色处理和热力图
OpenCV的定义了12种colormap常数,选择一个需要的即可
cv2.applyColorMap(heatmap_g, cv2.COLORMAP_JET)
图像可以使用addWeighted进行叠加处理
cv2.addWeighted(heatmap_img, alpha, merge_img, 1-alpha, 0, merge_img) # 将热度图覆盖到原图

def heatmap_overlay(image,heatmap):
    # 灰度化heatmap
    heatmap_g = heatmap.astype(np.uint8)
    # 热力图伪彩色
    heatmap_color = cv2.applyColorMap(heatmap_g, cv2.COLORMAP_JET)
    # overlay热力图
    merge_img = image.copy()
    heatmap_img = heatmap_color.copy()
    overlay = image.copy()
    alpha = 0.25 # 设置覆盖图片的透明度
    #cv2.rectangle(overlay, (0, 0), (merge_img.shape[1], merge_img.shape[0]), (0, 0, 0), -1) # 设置蓝色为热度图基本色
    cv2.addWeighted(overlay, alpha, merge_img, 1-alpha, 0, merge_img) # 将背景热度图覆盖到原图
    cv2.addWeighted(heatmap_img, alpha, merge_img, 1-alpha, 0, merge_img) # 将热度图覆盖到原图
    return merge_img

参考:
https://blog.csdn.net/u013381011/article/details/78341861

Categories: 语言编程 Tags: ,

[Python]使用OpenCV实现图像和视频转换操作

July 15th, 2019 No comments

将视频按FPS拆解成单张图片
使用cv2.VideoCapture
cv2.VideoCapture(video_path)
计算FPS使用,注意部分压缩视频FPS存在丢帧情况,需要进行跳帧处理
fps = int(vidcap.get(cv2.CAP_PROP_FPS))

def video_split():
  video_path = 'test/video/video_01.mp4'
  video_name = video_path[:-4]
  vidcap = cv2.VideoCapture(video_path)
  success,image = vidcap.read()
  fps = int(vidcap.get(cv2.CAP_PROP_FPS))
  count = 0
  while success:
    image = image_process(image)
    cv2.imwrite("%s/%d.jpg" % (video_name, count), image)
    #if count % fps == 0:
    #    cv2.imwrite("%s/%d.jpg" % (video_name, int(count / fps)), image)
    print('Process %dth seconds: ' % int(count / fps), success)
    success,image = vidcap.read()
    count += 1

Read more…

Categories: 语言编程 Tags: ,

[Python]使用OpenCV进行轮廓检索

July 15th, 2019 No comments

对遮罩层进行轮廓检索并合并到图像上
第一步使用高斯模糊GaussianBlur模糊边缘像素
第二步使用Canny侦测边界,丢弃部分散点
最后使用findContours找到外框

#对遮罩层进行轮廓检索并合并到图像上
def drawMaskContoursOverImage(image,mask):
    # convert colorspace
    gray = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
    #image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
 
    # 3*3 GaussianBlur
    # gray = cv2.GaussianBlur(gray, (3, 3), 0)
    # canny detect edge
    gray = cv2.Canny(gray, 100, 300)
 
    ret, thresh = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)
 
    # binary是最后返回的二值图像
    #findContours()第一个参数是源图像、第二个参数是轮廓检索模式,第三个参数是轮廓逼近方法
    #输出是轮廓和层次结构,轮廓是图像中所有轮廓的python列表,每个单独的轮廓是对象边界点的(x,y)坐标的Numpy数组
    binary, contours, hierarchy = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    cv2.drawContours(image, contours, -1, (0, 255, 0), 1)
 
    # 写图像
    cv2.imwrite('%s.masked.png'%pair[0],image,[int(cv2.IMWRITE_PNG_COMPRESSION),3])

参考
https://blog.csdn.net/dz4543/article/details/80655067

Categories: 语言编程 Tags: ,

解决Python Error ‘TSaslClientTransport’ object has no attribute ‘trans’

November 12th, 2018 No comments

解决Python Error ‘TSaslClientTransport’ object has no attribute ‘trans’
原因应该是thrift和impyla包版本的问题

sudo pip uninstall thrift
sudo pip uninstall impyla
sudo pip install thrift==0.9.3
sudo pip install impyla==0.13.8

参考
http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Python-Error-TSaslClientTransport-object-has-no-attribute/m-p/58033

Categories: 系统管理 Tags: ,