• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

python3实现ftp服务功能(服务端 For Linux)

python 搞代码 4年前 (2022-01-07) 43次浏览 已收录 0个评论

这篇文章主要介绍了python3实现ftp服务功能,服务端 For Linux,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

本文实例为大家分享了python3实现ftp服务功能的具体代码,供大家参考,具体内容如下

功能介绍:

可执行的命令:

ls
pwd
cd
put
rm
get
mkdir

1、用户加密认证

2、允许多用户同时登陆

3、每个用户有自己的家目录,且只可以访问自己的家目录

4、运行在自己家目录下随意切换目录

5、允许上传下载文件,且文件一致

6、传输过程中显示进度条
server main 代码:

 # Author by Andy # _*_ coding:utf-8 _*_ import os, sys, json, hashlib, socketserver, time base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(base_dir) from conf import userdb_set class Ftp_server(socketserver.BaseRequestHandler): user_home_dir = '' def auth(self, *args): '''验证用户名及密码''' cmd_dic = args[0] username = cmd_dic["username"] password = cmd_dic["password"] f = open(userdb_set.userdb_set(), 'r') user_info = json.load(f) if username in user_info.keys(): if password == user_info[username]: self.request.send('0'.encode()) os.chdir('/home/%s' % username) self.user_home_dir = os.popen('pwd').read().strip() data = "%s login successed" % username self.loging(data) else: self.request.send('1'.encode()) data = "%s login failed" % username self.loging(data) f.close else: self.request.send('1'.encode()) data = "%s login failed" % username self.loging(data) f.close ########################################## def get(self, *args): '''给客户端传输文件''' request_code = { '0': 'file is ready to get', '1': 'file not found!' } cmd_dic = args[0] self.loging(json.dumps(cmd_dic)) filename = cmd_dic["filename"] if os.path.isfile(filename): self.request.send('0'.encode('utf-8')) # 确认文件存在 self.request.recv(1024) self.request.send(str(os.stat(filename).st_size).encode('utf-8')) self.request.recv(1024) m = hashlib.md5() f = open(filename, 'rb') for line in f: m.update(line) self.request.send(line) self.request.send(m.hexdigest().encode('utf-8')) print('From server:Md5 value has been sended!') f.close() else: self.request.send('1'.encode('utf-8')) ########################################### def cd(self, *args): '''执行cd命令''' user_current_dir = os.popen('pwd').read().strip() cmd_dic = args[0] self.loging(json.dumps(cmd_dic)) path = cmd_dic['path'] if path.startswith('/'): if self.user_home_dir in path: os.chdir(path) new_dir = os.popen('pwd').read() user_current_dir = new_dir self.request.send('Change dir successfully!'.encode("utf-8")) data = 'Change dir successfully!' self.loging(data) elif os.path.exists(path): self.request.send('Permission Denied!'.encode("utf-8")) data = 'Permission Denied!' self.loging(data) else: self.request.send('Directory not found!'.encode("utf-8")) data = 'Directory not found!' self.loging(data) elif os.path.exists(path): os.chdir(path) new_dir = os.popen('pwd').read().strip() if self.user_home_dir in new_dir: self.request.send('Change dir successfully!'.encode("utf-8")) user_current_dir = new_dir data = 'Change dir successfully!' self.loging(data) else: os.chdir(user_current_dir) self.request.send('Permission Denied!'.encode("utf-8")) data = 'Permission Denied!' self.loging(data) else: self.request.send('Directory not found!'.encode("utf-8")) data = 'Directory not found!' self.loging(data) ########################################### def rm(self, *args): request_code = { '0': 'file exist,and Please confirm whether to rm', '1': 'file not found!' } cmd_dic = args[0] self.loging(json.dumps(cmd_dic)) filename = cmd_dic['filename'] if os.path.exists(filename): self.request.send('0'.encode("utf-8")) # 确认文件存在 client_response = self.request.recv(1024).decode() if client_response == '0': os.popen('rm -rf %s' % filename) self.request.send(('File %s has been deleted!' % filename).encode("utf-8")) self.loging('File %s has been deleted!' % filename) else: self.request.send(('File %s not deleted!' % filename).encode("utf-8")) self.loging('File %s not deleted!' % filename) else: self.request.send('1'.encode("utf-8")) ######################################## def pwd(self, *args): '''执行pwd命令''' cmd_dic = args[0] self.loging(json.dumps(cmd_dic)) server_response = os.popen('pwd').read().strip().encode("utf-8") self.request.send(server_response) ############################################# def ls(self, *args): '''执行ls命名''' cmd_dic = args[0] self.loging(json.dumps(cmd_dic)) path = cmd_dic['path'] cmd = 'ls -l %s' % path server_response = os.popen(cmd).read().encode("utf-8") self.request.send(server_response) ############################################ def put(self, *args): '''接收客户端文件''' cmd_dic = args[0] self.loging(json.dumps(cmd_dic)) filename = cmd_dic["filename"] filesize = cmd_dic["size"] if os.path.isfile(filename): f = open(filename + '.new', 'wb') else: f = open(filename, 'wb') request_code = { '200': 'Ready to recceive data!', '210': 'Not ready to received data!' } self.request.send('200'.encode()) receive_size = 0 while True: if receive_size ' % localtime + data + '\n') ############################################## def handle(self): # print("您本次访问使用的IP为:%s" %self.client_address[0]) # localtime = time.asctime( time.localtime(time.time())) # print(localtime) while True: try: self.data = self.request.recv(1024).decode() # # print(self.data) cmd_dic = json.loads(self.data) action = cmd_dic["action"] # print("用户请求%s"%action) if hasattr(self, action): func = getattr(self, action) func(cmd_dic) except Exception as e: self.loging(str(e)) break def run(): HOST, PORT = '0.0.0.0', 6969 print("The server is started,and listenning at port 6969") server = socketserver.ThreadingTCPServer((HOST, PORT), Ftp_server) server.serve_forever() if __name__ == '__main__': run() 

设置用户口令代码:

 #Author by Andy #_*_ coding:utf-8 _*_ import os,json,hashlib,sys base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) userdb_file = base_dir+"\data\\userdb" # print(userdb<p style="color:transparent">来源gao!%daima.com搞$代*!码$网</p>_file) def userdb_set(): if os.path.isfile(userdb_file): # print(userdb_file) return userdb_file else: print('请先为您的服务器创建用户!') user_data = {} dict={} Exit_flags = True while Exit_flags: username = input("Please input username:") if username != 'exit': password = input("Please input passwod:") if password != 'exit': user_data.update({username:password}) m = hashlib.md5() # m.update('hello') # print(m.hexdigest()) for i in user_data: # print(i,user_data[i]) m.update(user_data[i].encode()) dict.update({i:m.hexdigest()}) else: break else: break f = open(userdb_file,'w') json.dump(dict,f) f.close() return userdb_file 

目录结构:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持gaodaima搞代码网

以上就是python3实现ftp服务功能(服务端 For Linux)的详细内容,更多请关注gaodaima搞代码网其它相关文章!


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:python3实现ftp服务功能(服务端 For Linux)

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址