Python实现上传Minio和阿里Oss文件
这篇文章主要介绍了如何通过Python上传Minio和阿里OSS文件,文中的示例代码介绍得很详细,对我们的工作和学习都有一定的价值,感兴趣的小伙伴可以了解一下
目录
前言
环境依赖
代码
补充
前言
本文提供Python上传minio以及阿里oss文件工具,给自己留个记录。
环境依赖
安装minio以及oss2依赖
1 2 | pip install minio - i https: / / pypi.douban.com / simple pip install oss2 - i https: / / pypi.douban.com / simple |
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | #!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2021/12/10 21:35 # @Author : 剑客阿良_ALiang # @Site : # @File : upload_tool.py # !/user/bin/env python # coding=utf-8 """ @project : dh_train @author : huyi @file : remote_upload_util.py @ide : PyCharm @time : 2021-12-10 14:58:29 """ import traceback from minio import Minio from minio.error import S3Error import oss2 def minio_file_upload(end_point: str , access_key: str , secret_key: str , bucket: str , remote_path: str , local_path: str ): try : _end_point = end_point.replace( 'https://' , ' ').replace(' http: / / ', ' ') # Create a client with the MinIO server playground, its access key # and secret key. client = Minio( _end_point, access_key = access_key, secret_key = secret_key, secure = False ) # Make 'asiatrip' bucket if not exist. found = client.bucket_exists(bucket) if not found: client.make_bucket(bucket) else : print ( "Bucket {} already exists" . format (bucket)) # Upload '/home/user/Photos/asiaphotos.zip' as object name # 'asiaphotos-2015.zip' to bucket 'asiatrip'. client.fput_object( bucket, remote_path, local_path, ) print ( "{} is successfully uploaded as " "object {} to bucket {}." . format (local_path, remote_path, bucket) ) except S3Error as e: print ( "*** minio上传文件异常 -> {} {}" . format ( str (e), traceback.format_exc())) raise Exception( "minio上传文件异常:[{}]" . format ( str (e))) def oss_file_upload(end_point: str , access_key: str , secret_key: str , bucket: str , remote_path: str , local_path: str ): try : _end_point = end_point.replace( 'https://' , ' ').replace(' http: / / ', ' ') # 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。 auth = oss2.Auth(access_key, secret_key) # yourEndpoint填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。 # 填写Bucket名称。 bucket = oss2.Bucket(auth, _end_point, bucket) # 填写Object完整路径和本地文件的完整路径。Object完整路径中不能包含Bucket名称。 # 如果未指定本地路径,则默认从示例程序所属项目对应本地路径中上传文件。 bucket.put_object_from_file(remote_path, local_path) except S3Error as e: print ( "*** oss上传文件异常 -> {} {}" . format ( str (e), traceback.format_exc())) raise Exception( "oss上传文件异常:[{}]" . format ( str (e))) |
代码说明:
1、参数分别为endpoint(IP或者域名:端口)、accessKey、secretKey、桶名、远程文件路径、本地文件路径。
补充
Python实现Minio的下载(主要用于异地备份的中转站)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | import logging from minio import Minio from minio.error import S3Error logging.basicConfig( level = logging.INFO, filename = '../mysqlbackup_downlaod.log' , filemode = 'a' , format = '%(asctime)s %(name)s %(levelname)s--%(message)s' ) file_name = "mysql_monitor.py" file_path = "C:\\Users\\lpy\\Desktop\\img\\{}" . format (file_name) def download_file(): # 创建一个客户端 minioClient = Minio( 'minio.***.com' , access_key = 'admin' , secret_key = '****' , secure = False ) try : minioClient.fget_object( bucket_name = "backup" , object_name = "mysql/dev/{}" . format (file_name), file_path = file_path ) logging.info( "file '{0}' is successfully download" . format (file_name)) except S3Error as err: logging.error( "download_failed:" , err) if __name__ = = '__main__' : download_file()<font face = "Arial, Verdana, sans-serif" ><span style = "white-space: normal;" > < / span>< / font> |
以上就是Python实现上传Minio和阿里Oss文件的详细内容
原文链接:https://blog.csdn.net/zhiweihongyan1/article/details/121866248
伪原创工具 SEO网站优化 https://www.237it.com/