1. 分布式文件存储选型比较、架构设计
1.1 分布式文件存储的来源
在这个数据爆炸的时代,产生的数据量不断地在攀升,从GB,TB,PB,ZB.挖掘其中数据的价值也是企业在不断地追求的终极目标。但是要想对海量的数据进行挖掘,首先要考虑的就是海量数据的存储问题,比如Tb量级的数据。
谈到数据的存储,则不得不说的是磁盘的数据读写速度问题。早在上个世纪90年代初期,普通硬盘的可以存储的容量大概是1G左右,硬盘的读取速度大概为4.4MB/s.读取一张硬盘大概需要5分钟时间,但是如今硬盘的容量都在1TB左右了,相比扩展了近千倍。但是硬盘的读取速度大概是100MB/s。读完一个硬盘所需要的时间大概是2.5个小时。所以如果是基于TB级别的数据进行分析的话,光硬盘读取完数据都要好几天了,更谈不上计算分析了。那么该如何处理大数据的存储,计算分析呢?
1.2 常用的分布式文件存储
1.2.1 常见的分布式文件系统
GFS、HDFS、Lustre 、Ceph 、GridFS 、mogileFS、TFS、FastDFS等。各自适用于不同的领域。它们都不是系统级的分布式文件系统,而是应用级的分布式文件存 储服务。
分布式文件存储选型比较
整理了很多Linux后台架构师学习资料,视频,面试题, 请私信 。
1.2.2 系统整体对比
|
对比说明 /文件系统 |
TFS |
FastDFS |
MogileFS |
MooseFS |
GlusterFS |
Ceph |
|
开发语言 |
C++ |
C |
Perl |
C |
C |
C++ |
|
开源协议 |
GPL V2 |
GPL V3 |
GPL |
GPL V3 |
GPL V3 |
LGPL |
|
数据存储方式 |
块 |
文件/Trunk |
文件 |
块 |
文件/ 块 |
对象/ 文件/块 |
|
集群节点通信协议 |
私有协议(TCP ) |
私有协议(TCP ) |
HTTP |
私有协议(TCP ) |
私有协议(TCP )/ RDAM(远程直接访问内存) |
私有协议(TCP ) |
|
专用元数据存储点 |
占用NS |
无 |
占用DB |
占用MFS |
无 |
占用MDS |
|
在线扩容 |
支持 |
支持 |
支持 |
支持 |
支持 |
支持 |
|
冗余备份 |
支持 |
支持 |
- |
支持 |
支持 |
支持 |
|
单点故障 |
存在 |
不存在 |
存在 |
存在 |
不存在 |
存在 |
|
跨集群同步 |
支持 |
部分支持 |
- |
- |
支持 |
不适用 |
|
易用性 |
安装复杂,官方文档少 |
安装简单,社区相对活跃 |
- |
安装简单,官方文档多 |
安装简单,官方文档专业化 |
安装简单,官方文档专业化 |
|
适用场景 |
跨集群的小文件 |
单集群的中小文件 |
- |
单集群的大中文件 |
跨集群云存储 |
单集群的大中小文件 |
1.3 知名开源分布式文件存储
1.3.1 GFS(Google File System)
Google公司为了满足本公司需求而开发的基于Linux的专有分布式文件系统。尽管Google公布了该系统的一些技术细节,但Google并没有将该系统的软件部分作为开源软件发布。
1.3.2 HDFS
Hadoop 实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。 Hadoop是Apache Lucene创始人Doug Cutting开发的使用广泛的文本搜索库。它起源于Apache Nutch,
后者是一个开源的网络搜索引擎,本身也是Luene项目的一部分。Aapche Hadoop架构是MapReduce算法的一种开源应用,是Google开创其帝国的重要基石。
1.3.3 TFS
TFS(Taobao FileSystem)是一个高可扩展、高可用、高性能、面向互联网服务的分布式文件系统,主要针对海量的非结构化数据,它构筑在普通的Linux机器 集群上,可为外部提供高可靠
和高并发的存储访问。TFS为淘宝提供海量小文件存储,通常文件大小不超过1M,满足了淘宝对小文件存储的需求,被广泛地应用 在淘宝各项应用中。它采用了HA架构和平滑扩容,保证了整个文件系统的可用性和扩展性。同时扁平化的数据组织结构,可将文件名映射到文件的物理地址,简化 了文件的访问流程,一定程度上为TFS提供了良好的读写性能。
Google学术论文,这是众多分布式文件系统的起源,HDFS和TFS都是参考Google的GFS设计出来的。
1.4 典型的分布式文件存储的架构设计
我以hadoop的HDFS为例,毕竟开源的分布式文件存储使用的最多。
Hadoop分布式文件系统(HDFS)被设计成适合运行在通用硬件(commodity hardware)上的分布式文件系统。HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。HDFS放宽了一部分POSIX约束,来实现流式读取文件系统数据的目的。
1.4.1 大规模数据集
运行在HDFS上的应用具有很大的数据集。HDFS上的一个典型文件大小一般都在G字节至T字节。因此,HDFS被调节以支持大文件存储。它应该能提供整体上高的数据传输带宽,能在一个集群里扩展到数百个节点。一个单一的HDFS实例应该能支撑数以千万计的文件。
1.4.2 简单的一致性模型
HDFS应用需要一个“一次写入多次读取”的文件访问模型。一个文件经过创建、写入和关闭之后就不需要改变。这一假设简化了数据一致性问题,并且使高吞吐量的数据访问成为可能。Map/Reduce应用或者网络爬虫应用都非常适合这个模型。目前还有计划在将来扩充这个模型,使之支持文件的附加写操作。
1.4.3 异构软硬件平台间的可移植性
HDFS在设计的时候就考虑到平台的可移植性。这种特性方便了HDFS作为大规模数据应用平台的推广。
1.4.4 Namenode 和 Datanode
HDFS采用master/slave架构。一个HDFS集群是由一个Namenode和一定数目的Datanodes组成。
Namenode是一个中心服务器,负责管理文件系统的名字空间(namespace)以及客户端对文件的访问。
集群中的Datanode一般是一个节点一个,负责管理它所在节点上的存储。HDFS暴露了文件系统的名字空间,用户能够以文件的形式在上面存储数据。从内部看,一个文件其实被分成一个或多个数据块,这些块存储在一组Datanode上。
Namenode执行文件系统的名字空间操作,比如打开、关闭、重命名文件或目录。它也负责确定数据块到具体Datanode节点的映射。Datanode负责处理文件系统客户端的读写请求。在Namenode的统一调度下进行数据块的创建、删除和复制。

Namenode和Datanode被设计成可以在普通的商用机器上运行。这些机器一般运行着GNU/Linux操作系统(OS)。HDFS采用Java语言开发,因此任何支持Java的机器都可以部署Namenode或Datanode。由于采用了可移植性极强的Java语言,使得HDFS可以部署到多种类型的机器上。一个典型的部署场景是一台机器上只运行一个Namenode实例,而集群中的其它机器分别运行一个Datanode实例。这种架构并不排斥在一台机器上运行多个Datanode,只不过这样的情况比较少见。
2. fastdfs源码分析
fastdfs是一个轻量级的分布式文件系统,主要由
- tracker server,
- storage server
以及client组成,这里主要涉及两点 :
- 1)客户端上传文件流程和协议分析
- 2)实现一个简单的文件上传函数
2.1 文件上传的基本流程

文件上传
fastdfs中上传一个文件,主要涉及以下几个步骤:
- 上传连接请求,客户端会向tracker server发出上传文件的请求
- tracker收到请求后,返回storage server的ip和端口
- 客户端连接storage,并且上传文件
- 文件上传完成后,storage返回路径信息
以下具体分析文件上传过程中的协议和各种操作
2.1.1 fastdfs协议头部
typedefstruct
{
charpkg_len[FDFS_PROTO_PKG_LEN_SIZE];//bodylength,notincludingheader(8个字节)
charcmd;//commandcodeTRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE
charstatus;//statuscodeforresponse
}TrackerHeader;
fastdfs协议的头部是由10个字节大小的结构体构成,
- 发送:发送数据时,先发送TrackerHeader到服务器,随后发送具体的数据
- 接收:接收数据时,先接收sizeof(TrackerHeader)大小的报文头部,随后接收pkg_len长度的报文体
- status: 发送的时候设置为0
- cmd: 命令
- pkg_len:一个int64_t的整型,除去TrackerHeader长度的报文长度
2.2 客户端向tracker server发送获取storage地址请求
//协议头
//pkg_len|cmd|status
//8bytes|1bytes|1bytes
//向trackerserver请求storageservercmd
#defineTRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE101
TrackerHeaderheader;//协议头部
memset(&header,0,sizeof(TrackerHeader));
header.cmd=TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE;
//向trackerserver请求storage,tcpsenddata返回非0,表示发送成功
if(tcpsenddata(sockfd,&header,sizeof(TrackerHeader),10,&count)!=0)
{
fprintf(stderr,"tcpsenddataerror:%s\n",strerror(errno));
return1;
}
else//请求发送成功,等待tracker回复
{
//接收头部,头部是一个TrackerHeader类型,10个字节
TrackerHeaderresp;
if((ret_code=tcprecvdata(sockfd,&resp,sizeof(TrackerHeader),10,&count))!=0)
{
fprintf(stderr,"tcprecvdataerror:%s\n",strerror(ret_code));
return1;
}
//开始接收报文体
//int64_tread_int64(constchar*buff)
//{
//unsignedchar*p;
//p=(unsignedchar*)buff;
//return(((int64_t)(*p))<<56)|\
//(((int64_t)(*(p+1)))<<48)|\
//(((int64_t)(*(p+2)))<<40)|\
//(((int64_t)(*(p+3)))<<32)|\
//(((int64_t)(*(p+4)))<<24)|\
//(((int64_t)(*(p+5)))<<16)|\
//(((int64_t)(*(p+6)))<<8)|\
//((int64_t)(*(p+7)));
//}
intsize=read_int64(resp.pkg_len);//获取报体长度
char*buf=(char*)calloc(size+1,sizeof(char));
if((ret_code=tcprecvdata(sockfd,buf,size,10,&count)!=0))
{
fprintf(stderr,"tcprecvdataerror:%s\n",strerror(ret_code));
return1;
}
//报文体
//group_name|ip|port|storage_index
//16bytes|16bytes|8bytes|
//#defineTRACKER_QUERY_STORAGE_STORE_BODY_LEN40
if(count!=TRACKER_QUERY_STORAGE_STORE_BODY_LEN)
{
fprintf(stderr,"invalidmessage");
return1;
}
//groupname
//#defineFDFS_GROUP_NAME_MAX_LEN16
chargroup_name[FDFS_GROUP_NAME_MAX_LEN+1]={0};
memcpy(group_name,buf,FDFS_GROUP_NAME_MAX_LEN);
group_name[FDFS_GROUP_NAME_MAX_LEN]='\0';
//ip:port
//#defineIP_ADDRESS_SIZE16
//port:8bytes
charip[IP_ADDRESS_SIZE+1]={0};
memcpy(ip,buf+FDFS_GROUP_NAME_MAX_LEN,IP_ADDRESS_SIZE-1);
charszPort[8]={0};
memcpy(szPort,buf+FDFS_GROUP_NAME_MAX_LEN+IP_ADDRESS_SIZE-1,8);
ip[IP_ADDRESS_SIZE]='\0';
intport=read_int64(szPort);
//storageindex;
char*storage_index=buf+FDFS_GROUP_NAME_MAX_LEN+IP_ADDRESS_SIZE-1+FDFS_PROTO_PKG_LEN_SIZE;
...
}
2.3 以上步骤完成后,获取storage的ip 和 port后,就可以上传文件了
在官方的客户端中,文件操作有upload,download, append,delete等,这里只涉及upload
上传文件中,官方给出了三种方式
- 通过buffer上传,即将文件读取进内存,然后在发送
- 使用sendfile,sendfile是Linux提供的一个库函数
- 通过回调函数的方式
这里主要涉及的是第一种,通过buffer上传的方式
文件上传协议:
//文件上传协议头部
10bytes|1bytes|8bytes|6bytes|
TrackerHeader|storage_index|文件长度|文件名或者全为0)|
//storage_index是客户端向trackerserver申请storageindex时候返回的结果
//文件名如果不为空,那么取前6位,或者可以全部设置为0
//上传完成storage回复客户端协议
10bytes|16bytes|TrackerHeader.pkg_len-16bytes
TrackerHeader|groupname|remotefilename
voiduploadfile(intsockfd,constchar*filepath,char*storage_index)
{
charout_buf[512];
TrackerHeader*pHeader;
char*p=out_buf;
char*buf=NULL;
//TrackerHeader10bytes
//文件上传协议头部
//10bytes|1bytes|8bytes|6bytes|
//TrackerHeader|storage_index|文件长度|文件名或者全为0)|
pHeader=(TrackerHeader*)out_buf;
p+=sizeof(TrackerHeader);
//storageindex1bytes
*p++=*storage_index;
//filesize8bytes
longintfilesize=0;
intret=0;
//读取文件到buf,并且返回文件长度filesize
if((ret=getfilebuf(&buf,&filesize,filepath)!=0))
{
fprintf(stderr,"getfilebuffailed:%s\n",strerror(ret));
return;
}
//voidwrite_int64(int64_tn,char*buff)
//{
//unsignedchar*p;
//p=(unsignedchar*)buff;
//*p++=(n>>56)&0xFF;
//*p++=(n>>48)&0xFF;
//*p++=(n>>40)&0xFF;
//*p++=(n>>32)&0xFF;
//*p++=(n>>24)&0xFF;
//*p++=(n>>16)&0xFF;
//*p++=(n>>8)&0xFF;
//*p++=n&0xFF;
//}
write_int64(filesize,p);
//#defineFDFS_PROTO_PKG_LEN_SIZE8
p+=FDFS_PROTO_PKG_LEN_SIZE;
//ext_name
//#defineFDFS_FILE_EXT_NAME_MAX_LEN6
memset(p,0,FDFS_FILE_EXT_NAME_MAX_LEN);
p+=FDFS_FILE_EXT_NAME_MAX_LEN;
//setTrackerHeader
write_int64(p-out_buf+filesize-sizeof(TrackerHeader),pHeader->pkg_len);
//#defineSTORAGE_PROTO_CMD_UPLOAD_FILE11
pHeader->cmd=STORAGE_PROTO_CMD_UPLOAD_FILE;
pHeader->status=0;
//发送报文头部
intcount;
intret_code=0;
if((ret_code=tcpsenddata(sockfd,out_buf,p-out_buf,10,&count)!=0)){
fprintf(stderr,"tcpsenddatafailed:%s\n",strerror(errno));
return;
}
//发送报文体,具体文件数据
if((ret_code=tcpsenddata(sockfd,buf,filesize,10,&count))!=0){
fprintf(stderr,"tcpsenddatabodyfailed:%s\n",strerror(errno));
return;
}
//接收storageserver回复
//上传完成storage回复客户端协议
//10bytes|16bytes|TrackerHeader.pkg_len-16bytes
//TrackerHeader|groupname|remotefilename
TrackerHeaderresp;
if((ret_code=tcprecvdata(sockfd,&resp,sizeof(TrackerHeader),1000,&count))!=0){
fprintf(stderr,"tcprecvdatafailed:%s\n",strerror(ret_code));
return;
}
if(count!=sizeof(TrackerHeader)){
fprintf(stderr,"invalidheader");
return;
}
int64_tbodylen=read_int64(resp.pkg_len);
//接收报文体
char*in_buf=(char*)calloc(bodylen+1,sizeof(char));
if((ret_code=tcprecvdata(sockfd,in_buf,bodylen,10,&count))!=0)
{
fprintf(stderr,"readbodyfailed:%s\n",strerror(ret_code));
return;
}
//groupname
//#defineFDFS_GROUP_NAME_MAX_LEN16
chargroup_name[FDFS_GROUP_NAME_MAX_LEN+1];
memcpy(group_name,in_buf,FDFS_GROUP_NAME_MAX_LEN);
group_name[FDFS_GROUP_NAME_MAX_LEN]='\0';
//remotefilename
charremote_filename[bodylen-FDFS_GROUP_NAME_MAX_LEN+1];
memcpy(remote_filename,in_buf+FDFS_GROUP_NAME_MAX_LEN,bodylen-FDFS_GROUP_NAME_MAX_LEN+1);
cout<<"groupname:"<<group_name<<endl;
cout<<"remote_filename:"<<remote_filename<<endl;
charhttpaddr[128]={0};
sprintf(httpaddr,"http://106.75.129.177:8080/%s/%s",group_name,remote_filename);
cout<<"httpaddr:"<<httpaddr<<endl;//http地址
}
以下附上完整代码, ubuntu14位, 编译器 g++,测试已通过
#include<iostream>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<errno.h>
#include<unistd.h>
#include<fcntl.h>
#include<stdlib.h>
#include<stdio.h>
#include<string.h>
usingnamespacestd;
#defineFDFS_GROUP_NAME_MAX_LEN16
#defineFDFS_PROTO_PKG_LEN_SIZE8
#defineIP_ADDRESS_SIZE16
//cmd
#defineTRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE101
#defineSTORAGE_PROTO_CMD_UPLOAD_FILE11
#defineTRACKER_QUERY_STORAGE_STORE_BODY_LEN(FDFS_GROUP_NAME_MAX_LEN\
+IP_ADDRESS_SIZE-1+FDFS_PROTO_PKG_LEN_SIZE+1)
#defineFDFS_FILE_EXT_NAME_MAX_LEN6
typedefstruct{
charpkg_len[FDFS_PROTO_PKG_LEN_SIZE];
charcmd;
charstatus;
}TrackerHeader;
//setsocketfdnonblocking
intsetnonblocking(intsockfd);
inttcprecvdata(intsockfd,void*data,constintsize,\
constinttimeout_ms,int*count);
inttcpsenddata(intsockfd,void*data,constintsize,\
constinttimeout_ms,int*count);
int64_tread_int64(constchar*buf);
voidwrite_int64(int64_tn,char*buf);
voiduploadfile(intsockfd,constchar*filepath,char*storage_index);
intgetfilebuf(char**buf,longint*filesize,constchar*filepath);
//applystorageaddressfromtrackerserver
intmain(){
constchar*ip="127.0.0.1";
uint16_tport=22122;
intret_code=0;
intsockfd=-1;
intcount=0;
//connecttrackerserver
if((sockfd=socket(PF_INET,SOCK_STREAM,0))<0)
{
fprintf(stderr,"socketerrnor:%s\n",strerror(errno));
return1;
}
if((ret_code=setnonblocking(sockfd))!=0)
{
fprintf(stderr,"setnonblockingerror:%s\n",strerror(ret_code));
return1;
}
structsockaddr_inaddr;
addr.sin_addr.s_addr=inet_addr(ip);
addr.sin_port=htons(port);
addr.sin_family=AF_INET;
socklen_tlen=sizeof(structsockaddr);
if(connect(sockfd,(structsockaddr*)&addr,len)<0)
{
fprintf(stderr,"connecterror:%s\n",strerror(errno));
return1;
}
TrackerHeaderheader;
memset(&header,0,sizeof(TrackerHeader));
header.cmd=TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE;
if(tcpsenddata(sockfd,&header,sizeof(TrackerHeader),10,&count)!=0)
{
fprintf(stderr,"tcpsenddataerror:%s\n",strerror(errno));
return1;
}
else
{
//recvheader
TrackerHeaderresp;
if((ret_code=tcprecvdata(sockfd,&resp,sizeof(TrackerHeader),10,&count))!=0)
{
fprintf(stderr,"tcprecvdataerror:%s\n",strerror(ret_code));
return1;
}
cout<<"recvheader:"<<count<<endl;
//readbody;
intsize=read_int64(resp.pkg_len);
char*buf=(char*)calloc(size+1,sizeof(char));
if((ret_code=tcprecvdata(sockfd,buf,size,10,&count)!=0))
{
fprintf(stderr,"tcprecvdataerror:%s\n",strerror(ret_code));
return1;
}
//body
//group_name|ip|port|storage_index
//16bytes|16bytes|8bytes|
cout<<"readbody:"<<count<<endl;
if(count!=TRACKER_QUERY_STORAGE_STORE_BODY_LEN)
{
fprintf(stderr,"invalidmessage");
return1;
}
//groupname
chargroup_name[FDFS_GROUP_NAME_MAX_LEN+1]={0};
memcpy(group_name,buf,FDFS_GROUP_NAME_MAX_LEN);
group_name[FDFS_GROUP_NAME_MAX_LEN]='\0';
cout<<"groupname:"<<group_name<<endl;
//ip:port
charip[IP_ADDRESS_SIZE+1]={0};
memcpy(ip,buf+FDFS_GROUP_NAME_MAX_LEN,IP_ADDRESS_SIZE-1);
charszPort[8]={0};
memcpy(szPort,buf+FDFS_GROUP_NAME_MAX_LEN+IP_ADDRESS_SIZE-1,8);
ip[IP_ADDRESS_SIZE]='\0';
intport=read_int64(szPort);
cout<<"address:"<<ip<<":"<<port<<endl;
//storageindex;
char*storage_index=buf+FDFS_GROUP_NAME_MAX_LEN+IP_ADDRESS_SIZE-1+FDFS_PROTO_PKG_LEN_SIZE;
cout<<"storage_index:"<<storage_index<<endl;
free(buf);
//connectstorageserver
sockaddr_inst_addr;
st_addr.sin_addr.s_addr=inet_addr(ip);
st_addr.sin_family=AF_INET;
st_addr.sin_port=htons(port);
intstorage_fd=socket(AF_INET,SOCK_STREAM,0);
if(storage_fd<0){
fprintf(stderr,"socketfailed:%s\n",strerror(errno));
return1;
}
socklen_tlen2=sizeof(sockaddr_in);
if(connect(storage_fd,(structsockaddr*)&st_addr,len2)<0){
fprintf(stderr,"connectfailed:%s\n",strerror(errno));
return1;
}
uploadfile(storage_fd,"1.jpg",storage_index);
}
return0;
}
intgetfilebuf(char**buf,longint*filesize,constchar*filepath)
{
intret_code=0;
FILE*fp=fopen(filepath,"rb+");
if(fp==NULL)
{
ret_code=errno;
returnret_code;
}
//getfilesize;
fseek(fp,0,SEEK_END);
*filesize=ftell(fp);
fseek(fp,0,SEEK_SET);
cout<<"getfilesize:"<<*filesize<<endl;
//mallocbuf
*buf=(char*)calloc(*filesize+1,sizeof(char));
if(*buf==NULL){
ret_code=errno;
returnret_code;
}
intread_bytes=0;
intleft_bytes=*filesize;
char*p=*buf;
while(left_bytes>0){
read_bytes=fread(p,sizeof(char),left_bytes,fp);
left_bytes-=read_bytes;
p+=read_bytes;
}
returnret_code;
}
voiduploadfile(intsockfd,constchar*filepath,char*storage_index)
{
charout_buf[512];
TrackerHeader*pHeader;
char*p=out_buf;
char*buf=NULL;
//TrackerHeader10bytes
pHeader=(TrackerHeader*)out_buf;
p+=sizeof(TrackerHeader);
//storageindex1bytes
*p++=*storage_index;
//filesize8bytes
longintfilesize=0;
intret=0;
if((ret=getfilebuf(&buf,&filesize,filepath)!=0))
{
fprintf(stderr,"getfilebuffailed:%s\n",strerror(ret));
return;
}
printf("filesize:%ld\n",filesize);
write_int64(filesize,p);
p+=FDFS_PROTO_PKG_LEN_SIZE;
//ext_name
memset(p,0,FDFS_FILE_EXT_NAME_MAX_LEN);
p+=FDFS_FILE_EXT_NAME_MAX_LEN;
//setTrackerHeader
write_int64(p-out_buf+filesize-sizeof(TrackerHeader),pHeader->pkg_len);
pHeader->cmd=STORAGE_PROTO_CMD_UPLOAD_FILE;
pHeader->status=0;
//sendheader
intcount;
intret_code=0;
if((ret_code=tcpsenddata(sockfd,out_buf,p-out_buf,10,&count)!=0)){
fprintf(stderr,"tcpsenddatafailed:%s\n",strerror(errno));
return;
}
//sendbody
if((ret_code=tcpsenddata(sockfd,buf,filesize,10,&count))!=0){
fprintf(stderr,"tcpsenddatabodyfailed:%s\n",strerror(errno));
return;
}
//recvresponse
TrackerHeaderresp;
if((ret_code=tcprecvdata(sockfd,&resp,sizeof(TrackerHeader),1000,&count))!=0){
fprintf(stderr,"tcprecvdatafailed:%s\n",strerror(ret_code));
return;
}
if(count!=sizeof(TrackerHeader)){
fprintf(stderr,"invalidheader");
return;
}
int64_tbodylen=read_int64(resp.pkg_len);
char*in_buf=(char*)calloc(bodylen+1,sizeof(char));
if((ret_code=tcprecvdata(sockfd,in_buf,bodylen,10,&count))!=0)
{
fprintf(stderr,"readbodyfailed:%s\n",strerror(ret_code));
return;
}
//groupname
chargroup_name[FDFS_GROUP_NAME_MAX_LEN+1];
memcpy(group_name,in_buf,FDFS_GROUP_NAME_MAX_LEN);
group_name[FDFS_GROUP_NAME_MAX_LEN]='\0';
//remotefilename
charremote_filename[bodylen-FDFS_GROUP_NAME_MAX_LEN+1];
memcpy(remote_filename,in_buf+FDFS_GROUP_NAME_MAX_LEN,bodylen-FDFS_GROUP_NAME_MAX_LEN+1);
cout<<"groupname:"<<group_name<<endl;
cout<<"remote_filename:"<<remote_filename<<endl;
charhttpaddr[128]={0};
sprintf(httpaddr,"http://127.0.0.1:8080/%s/%s",group_name,remote_filename);
cout<<"httpaddr"<<httpaddr<<endl;
}
voidwrite_int64(int64_tn,char*buff)
{
unsignedchar*p;
p=(unsignedchar*)buff;
*p++=(n>>56)&0xFF;
*p++=(n>>48)&0xFF;
*p++=(n>>40)&0xFF;
*p++=(n>>32)&0xFF;
*p++=(n>>24)&0xFF;
*p++=(n>>16)&0xFF;
*p++=(n>>8)&0xFF;
*p++=n&0xFF;
}
int64_tread_int64(constchar*buff)
{
unsignedchar*p;
p=(unsignedchar*)buff;
return(((int64_t)(*p))<<56)|\
(((int64_t)(*(p+1)))<<48)|\
(((int64_t)(*(p+2)))<<40)|\
(((int64_t)(*(p+3)))<<32)|\
(((int64_t)(*(p+4)))<<24)|\
(((int64_t)(*(p+5)))<<16)|\
(((int64_t)(*(p+6)))<<8)|\
((int64_t)(*(p+7)));
}
intsetnonblocking(intsockfd)
{
intret_code=0;
if(fcntl(sockfd,F_SETFD,O_NONBLOCK)<0){
ret_code=errno;
}
returnret_code;
}
inttcpsenddata(intsockfd,void*data,constintsize,\
constinttimeout_ms,int*count)
{
intleft_bytes=size;
intwrite_bytes=0;
intret_code=0;
intres=0;
char*p=(char*)data;
fd_setrfds;
FD_ZERO(&rfds);
FD_SET(sockfd,&rfds);
while(left_bytes>0){
write_bytes=send(sockfd,p,left_bytes,0);
if(write_bytes>0)
{
left_bytes-=write_bytes;
p+=write_bytes;
continue;
}
elseif(write_bytes<0)
{
if(!(errno==EINTR||errno==EAGAIN||errno==EWOULDBLOCK))
{
ret_code=errno==0?errno:EINTR;
break;
}
}
else
{
ret_code=ENOTCONN;
break;
}
if(timeout_ms<=0)
{
res=select(sockfd+1,&rfds,NULL,NULL,NULL);
}
else
{
structtimevaltv;
tv.tv_usec=timeout_ms;
tv.tv_sec=0;
res=select(sockfd+1,&rfds,NULL,NULL,&tv);
}
if(res==0)
{
ret_code=ETIMEDOUT;
break;
}
if(res<0)
{
if(errno==EINTR)
{
continue;
}
ret_code=errno==0?errno:EINTR;
}
}
if(count!=NULL)
{
*count=size-left_bytes;
}
returnret_code;
}
inttcprecvdata(intsockfd,void*data,constintsize,\
constinttimeout_ms,int*count){
intleft_bytes=size;
intread_bytes=0;
intret_code=0;
intres=0;
char*p=(char*)data;
fd_setrfds;
FD_ZERO(&rfds);
FD_SET(sockfd,&rfds);
while(left_bytes>0){
read_bytes=recv(sockfd,p,left_bytes,0);
if(read_bytes>0)
{
left_bytes-=read_bytes;
p+=read_bytes;
continue;
}
elseif(read_bytes<0)
{
if(!(errno==EAGAIN||errno==EWOULDBLOCK||errno==EINTR))
{
ret_code=errno!=0?errno:EINTR;
break;
}
}
else
{
ret_code=ENOTCONN;
break;
}
if(timeout_ms<=0)
{
res=select(sockfd+1,&rfds,NULL,NULL,NULL);
}
else
{
structtimevaltv;
tv.tv_usec=timeout_ms;
tv.tv_sec=0;
res=select(sockfd+1,&rfds,NULL,NULL,&tv);
}
if(res==0)
{
ret_code=ETIMEDOUT;
break;
}
if(res<0)
{
if(errno==EINTR)
{
continue;
}
ret_code=errno==0?errno:EINTR;
break;
}
}
if(count!=NULL)
{
*count=size-left_bytes;
}
returnret_code;
}