I/O完成端口模型


1.完成端口模型

一、基本概念

完成端口是一种内核对象,它可以与多个文件描述符(如套接字、文件句柄等)相关联,并且允许多个线程同时处理这些文件描述符上的 I/O 操作。

核心思想是将 I/O 操作的完成通知与线程的分配和管理进行有效的分离,以实现高效的并发处理。

“I/O 操作的完成通知” 的意思是当 I/O 操作(如读取文件、接收网络数据等)完成之后,系统会发出一个通知信息。这种机制可以避免应用程序不断地轮询 I/O 状态以确定操作是否完成,从而提高了系统的效率和响应性能。

image-20241006164742093

二、工作原理

主线程做监听线程

服务线程作为操作线程

image-20241006170319980

  1. 创建完成端口
    • 应用程序首先创建一个完成端口对象。这个对象用于管理和分配系统资源,以及协调多个线程处理 I/O 操作。
  2. 关联文件描述符和完成端口
    • 将需要进行 I/O 操作的文件描述符(如套接字)与完成端口进行关联。这样,当在这些文件描述符(套接字)上发生 I/O 操作完成事件时,系统会将通知发送到对应的完成端口。
  3. 创建工作线程
    • 应用程序创建一组工作线程,这些线程会在完成端口上等待 I/O 操作的完成通知。通常,工作线程的数量可以根据系统的硬件资源(如处理器核心数量)进行调整,以达到最佳的性能。
  4. I/O 操作与完成通知
    • 当应用程序发起一个异步 I/O 操作(如读取数据从网络套接字),系统会在后台执行这个操作。一旦操作完成,系统会将一个完成通知放入完成端口。
  5. 工作线程处理完成通知
    • 工作线程会在完成端口上阻塞等待,当有完成通知到达时,线程会被唤醒并处理这个通知。通知中包含了关于完成的 I/O 操作的信息,如操作的类型、数据长度等。工作线程可以根据这些信息进行进一步的处理,如将读取的数据进行处理或者发送响应数据。

三、优势

  1. 高效的并发处理
    • 完成端口能够有效地管理大量的并发 I/O 操作,通过合理地分配工作线程,可以充分利用系统资源,提高应用程序的性能和响应速度。
  2. 减少线程上下文切换
    • 由于工作线程是在完成端口上等待 I/O 操作的完成通知,而不是频繁地进行轮询或者阻塞在单个 I/O 操作上,因此可以减少线程的上下文切换次数,降低系统开销。
  3. 可扩展性
    • 完成端口模型可以很容易地扩展到处理大量的并发连接,只需要根据系统资源增加工作线程的数量即可。

四、应用场景

  1. 高性能服务器应用
    • 对于需要处理大量并发连接的服务器应用,如网络服务器、数据库服务器等,完成端口 I/O 模型可以提供高效的 I/O 处理能力,保证服务器的性能和响应速度。
  2. 大规模文件传输
    • 在进行大规模文件传输时,完成端口可以有效地管理多个文件的 I/O 操作,提高文件传输的效率。
  3. 异步数据库访问
    • 对于需要进行异步数据库访问的应用程序,完成端口可以与数据库连接进行结合,实现高效的数据库查询和更新操作。

2.代码和勘误:

注意:

本代码来自于《windows网络与通信程序设计》第四章

其中在VS2022中进行编译运行的过程中有一定的错误,曾困扰许久

问题:

在发送完一条消息之后,在服务线程中再次投递的过程中,出现内存访问异常

QQ图片20241007112537

参数类型的的错误

VS2022会检测到下面代码中的 (DWORD)&pPerHandle出现错误

1
2
BOOL bOK = ::GetQueuedCompletionStatus(hCompletion,
&dwTrans, (DWORD)&pPerHandle, (LPOVERLAPPED*)&pPerIO, WSA_INFINITE);

但是无法检测到下面代码中的 (DWORD)pPerHandle出现错误,于是出现参数的不一致,导致出现上述问题

1
2
//将新接收的客户端连接绑定到完成端口对象hCompletion上
::CreateIoCompletionPort((HANDLE)pPerHandle->s, hCompletion, (DWORD)pPerHandle, 0);

解决办法:

将上述的两条依次代码修改为

1
2
BOOL bOK = ::GetQueuedCompletionStatus(hCompletion,
&dwTrans, (PULONG_PTR)&pPerHandle, (LPOVERLAPPED*)&pPerIO, WSA_INFINITE);
1
2
//将新接收的客户端连接绑定到完成端口对象hCompletion上
::CreateIoCompletionPort((HANDLE)pPerHandle->s, hCompletion, (ULONG_PTR)pPerHandle, 0);

完整代码如下:

Init.h文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#pragma once
#include <winsock2.h>
#pragma comment(lib, "WS2_32")
class CInitSock
{
public:
CInitSock(BYTE minorVer = 2, BYTE majorVer = 2)
{
WSADATA wsaData;
WORD sockVersion = MAKEWORD(minorVer, majorVer);
if (::WSAStartup(sockVersion, &wsaData) != 0)
{
exit(0);
}
}
~CInitSock()
{
::WSACleanup();
}
};

Server.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#include "init.h"
#include <stdio.h>
#include <windows.h>

// 初始化Winsock库
CInitSock theSock;

#define BUFFER_SIZE 1024

typedef struct _PER_HANDLE_DATA // per-handle数据
{
SOCKET s; // 对应的套节字句柄
sockaddr_in addr; // 客户方地址
} PER_HANDLE_DATA, *PPER_HANDLE_DATA;


typedef struct _PER_IO_DATA // per-I/O数据
{
OVERLAPPED ol; // 重叠结构
char buf[BUFFER_SIZE]; // 数据缓冲区
int nOperationType; // 操作类型
#define OP_READ 1
#define OP_WRITE 2
#define OP_ACCEPT 3
} PER_IO_DATA, *PPER_IO_DATA;


//服务线程
DWORD WINAPI ServerThread(LPVOID lpParam)//函数接受一个LPVOID类型的参数lpParam,这通常是一个通用指针,可以指向任何类型的数据
{
// 得到完成端口对象句柄
HANDLE hCompletion = (HANDLE)lpParam;

DWORD dwTrans;//定义一个变量用于存储传输的字节数
PPER_HANDLE_DATA pPerHandle;//用于存储与套接字相关的数据
PPER_IO_DATA pPerIO;//用于存储与套接字相关的 I/O 操作数据

while(TRUE)
{
// 在关联到此完成端口的所有套节字上等待I/O完成
//调用GetQueuedCompletionStatus函数
//在完成端口hCompletion上等待 I/O 操作完成。
//这个函数会阻塞直到有一个 I/O 操作完成,并将完成的信息填充到相应的变量中,包括传输的字节数dwTrans、与套接字相关的结构体指针pPerHandle和与 I/O 操作相关的结构体指针pPerIO
BOOL bOK = ::GetQueuedCompletionStatus(hCompletion,
&dwTrans, (PULONG_PTR)&pPerHandle, (LPOVERLAPPED*)&pPerIO, WSA_INFINITE);
if(!bOK) // 在此套节字上有错误发生
{
::closesocket(pPerHandle->s);
::GlobalFree(pPerHandle);
::GlobalFree(pPerIO);
continue;
}

if(dwTrans == 0 && // 套节字被对方关闭
(pPerIO->nOperationType == OP_READ || pPerIO->nOperationType == OP_WRITE))

{
::closesocket(pPerHandle->s);
::GlobalFree(pPerHandle);
::GlobalFree(pPerIO);
continue;
}

switch(pPerIO->nOperationType) // 通过per-I/O数据中的nOperationType域查看什么I/O请求完成了
{
case OP_READ: // 完成一个接收请求
{
pPerIO->buf[dwTrans] = '\0';//在接收缓冲区的末尾添加字符串结束符
printf(pPerIO -> buf);//打印接收到的数据

// 继续投递接收I/O请求
WSABUF buf;
buf.buf = pPerIO->buf ;
buf.len = BUFFER_SIZE;
pPerIO->nOperationType = OP_READ;

DWORD nFlags = 0;
::WSARecv(pPerHandle->s, &buf, 1, &dwTrans, &nFlags, &pPerIO->ol, NULL);
}
break;
case OP_WRITE: // 本例中没有投递这些类型的I/O请求
case OP_ACCEPT:
break;
}
}
return 0;
}


void main()
{
int nPort = 4567;
// 创建完成端口对象,创建工作线程处理完成端口对象中事件
// 0 代表完成端口线程数量和CPU线程一致
HANDLE hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
::CreateThread(NULL, 0, ServerThread, (LPVOID)hCompletion, 0, 0);

// 创建监听套节字,绑定到本地地址,开始监听
SOCKET sListen = ::socket(AF_INET, SOCK_STREAM, 0);
SOCKADDR_IN si;
si.sin_family = AF_INET;
si.sin_port = ::ntohs(nPort);
si.sin_addr.S_un.S_addr = INADDR_ANY;
//绑定
::bind(sListen, (sockaddr*)&si, sizeof(si));
//开启监听
::listen(sListen, 5);

// 循环处理到来的连接
while(TRUE)
{
// 等待接受未决(新)的连接请求
SOCKADDR_IN saRemote;
int nRemoteLen = sizeof(saRemote);
SOCKET sNew = ::accept(sListen, (sockaddr*)&saRemote, &nRemoteLen);

// 接受到新连接之后,为它创建一个per-handle数据,并将它们关联到完成端口对象。
//创建一个per-handle对象
PPER_HANDLE_DATA pPerHandle =
(PPER_HANDLE_DATA)::GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA));
//设置per-handle对象的目标套接字为sNew 设置客户方的地址信息(初始化)
pPerHandle->s = sNew;
memcpy(&pPerHandle->addr, &saRemote, nRemoteLen);

//将新接收的客户端连接绑定到完成端口对象hCompletion上
::CreateIoCompletionPort((HANDLE)pPerHandle->s, hCompletion, (ULONG_PTR)pPerHandle, 0);

// 投递一个接收请求
//分配一个重叠请求包
PPER_IO_DATA pPerIO = (PPER_IO_DATA)::GlobalAlloc(GPTR, sizeof(PER_IO_DATA));
pPerIO->nOperationType = OP_READ;//初始化为读操作
WSABUF buf;
buf.buf = pPerIO->buf;//pPerIO->buf中存储了在异步 I/O 操作中读取或写入的数据。
buf.len = BUFFER_SIZE;

DWORD dwRecv;
DWORD dwFlags = 0;
::WSARecv(pPerHandle->s, &buf, 1, &dwRecv, &dwFlags, &pPerIO->ol, NULL);
//&pPerIO->ol是重叠对象的指针
}
}


3.函数

1.CreateIoCompletionPort函数

CreateIoCompletionPort 函数有两个功能:

(1)创建一个完成端口对象

(2)把一个IO句柄(套接字)和完成端口关联起来

1
2
3
4
5
6
7
HANDLE CreateIoCompletionPort (
HANDLE FileHandle, //文件句柄,可以是文件、套接字等任何支持异步 I/O 的对象的句柄
HANDLE ExistingCompletionPort, //已存在的 I/O 完成端口句柄。如果为NULL,则函数会创建一个新的 I/O 完成端口
ULONG_PTR CompletionKey, //完成键。这是一个应用程序定义的值,与每个关联到 I/O 完成端口的文件句柄相关联。当 I/O 操作完成时,这个值会和完成数据包一起传递给完成端口,以便应用程序可以识别与之关联的特定资源或上下文信息
DWORD NumberOfConcurrentThreads //并发执行的线程数量
);

image-20241007111105081

2.GetQueuedCompletionStatus函数

作用就是取得完成端口的结果

这个函数会阻塞直到有一个 I/O 操作完成,并将完成的信息填充到相应的变量中,包括传输的字节数dwTrans、与套接字相关的结构体指针pPerHandle和与 I/O 操作相关的结构体指针pPerIO

1
2
3
4
5
6
7
8
BOOL GetQueuedCompletionStatus(
HANDLE CompletionPort, //完成端口,用于从该完成端口获取已完成的 I/O 操作的状态信息
LPDWORD lpNumberOfBytes, //表明这次的操作传递了多少个字节的数据
PULONG_PTR lpCompletionKey, //指向一个ULONG_PTR类型的变量的指针,该变量将接收与完成的 I/O 操作相关联的完成键(一般存储的就是用户传入的I/O句柄如套接字等)
LPOVERLAPPED *lpOverlapped, // buffer,保存的IO操作结果
DWORD dwMilliseconds // 可选的超时时间值,以毫秒为单位
);

3.WSARecv函数

1
2
3
4
5
6
7
8
9
10
int WSARecv( 
SOCKET s, //目标套接字
LPWSABUF lpBuffers, //用于指定接收数据的缓冲区
DWORD dwBufferCount, //有多少个缓冲区用于接收数据
LPDWORD lpNumberOfBytesRecvd,//该变量在函数返回时将包含实际接收到的字节数
LPDWORD lpFlags, //接受模式
LPWSAOVERLAPPED lpOverlapped, //该结构用于关联异步操作并提供状态信息。如果套接字不是在重叠模式下操作,这个参数可以为NULL
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);

image-20241007111947263

4.WSASend函数

1
2
3
4
5
6
7
8
9
10
int WSASend( 
SOCKET s, //目标套接字
LPWSABUF lpBuffers, //指定了要发送的数据缓冲区
DWORD dwBufferCount, //发送的缓冲区数量
LPDWORD lpNumberOfBytesSent, //该变量将接收实际发送的字节数
DWORD dwFlags, //标志参数
LPWSAOVERLAPPED lpOverlapped, //如果在重叠 I/O 模式下使用,这是一个指向WSAOVERLAPPED结构体的指针,用于异步操作。如果不是重叠 I/O,则为NULL
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine //果在重叠 I/O 模式下使用,这是一个指向完成例程函数的指针
);

5.个人课程作业附代码

作业4:基于IOCP模型编写可伸缩网络通信服务器程序。描述如下:
(1)基于IOCP模型,要求给完成端口指定的并发线程数量= CPU核数+2,业务服务线程数=1;
(2)数据包采用c结构体:
struct send_packet
{
byte len; //结构体长度,也就是包长度
byte type ; //协议类型
byte cmd; //命令码
char data[64]; //数据
};
(3)自动分包机制;
(4)服务端套接字收到客户端发来的数据后,保存到磁盘文件,文件名:file_日期.txt,如:file_2024-09-29.txt,然后向客户端回应消息,结构如下:
struct response_packet
{
byte len; //结构体长度,也就是包长度
byte err ; //大于0:成功, 小于0:失败
char msg[32]; //返回消息描述
};
(5)要求开发语言C++

Init.h文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#pragma once
#include <winsock2.h>
#pragma comment(lib, "WS2_32")
class CInitSock
{
public:
CInitSock(BYTE minorVer = 2, BYTE majorVer = 2)
{
WSADATA wsaData;
WORD sockVersion = MAKEWORD(minorVer, majorVer);
if (::WSAStartup(sockVersion, &wsaData) != 0)
{
exit(0);
}
}
~CInitSock()
{
::WSACleanup();
}
};

Server.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#define _CRT_SECURE_NO_WARNINGS
#include "init.h"
#include <stdio.h>
#include <windows.h>
#include<fstream>
#include<ctime>

// 初始化Winsock库
CInitSock theSock;

#define BUFFER_SIZE 1024
typedef struct _PER_HANDLE_DATA // per-handle数据
{
SOCKET s; // 对应的套节字句柄
sockaddr_in addr; // 客户方地址
} PER_HANDLE_DATA, * PPER_HANDLE_DATA;


typedef struct _PER_IO_DATA // per-I/O数据
{
OVERLAPPED ol; // 重叠结构
char buf[BUFFER_SIZE]; // 数据缓冲区
int nOperationType; // 操作类型
#define OP_READ 1
#define OP_WRITE 2
#define OP_ACCEPT 3
} PER_IO_DATA, * PPER_IO_DATA;

//接受数据包结构
struct send_packet
{
byte len; //结构体长度,也就是包长度
byte type; //协议类型
byte cmd; //命令码
char data[64]; //数据
};

//回复数据包结构
struct response_packet
{
byte len; //结构体长度,也就是包长度
byte err; //大于0:成功, 小于0:失败
char msg[32]; //返回消息描述
};

void SaveToFile(const char* data, int len) {

time_t now = time(0);
struct tm tstruct;
char buf[80];
localtime_s(&tstruct, &now);
strftime(buf, sizeof(buf), "file_%Y-%m-%d.txt", &tstruct);
std::ofstream ofs(buf, std::ios::app);
if (ofs) {
ofs.write(data, len);
ofs.close();
}
}


//服务线程
DWORD WINAPI ServerThread(LPVOID lpParam)//函数接受一个LPVOID类型的参数lpParam,这通常是一个通用指针,可以指向任何类型的数据
{
// 得到完成端口对象句柄
HANDLE hCompletion = (HANDLE)lpParam;

DWORD dwTrans;//定义一个变量用于存储传输的字节数
PPER_HANDLE_DATA pPerHandle;//用于存储与套接字相关的数据
PPER_IO_DATA pPerIO;//用于存储与套接字相关的 I/O 操作数据

while (TRUE)
{
// 在关联到此完成端口的所有套节字上等待I/O完成
//调用GetQueuedCompletionStatus函数
//在完成端口hCompletion上等待 I/O 操作完成。
//这个函数会阻塞直到有一个 I/O 操作完成,并将完成的信息填充到相应的变量中,包括传输的字节数dwTrans、与套接字相关的结构体指针pPerHandle和与 I/O 操作相关的结构体指针pPerIO
BOOL bOK = ::GetQueuedCompletionStatus(hCompletion,
&dwTrans, (PULONG_PTR)&pPerHandle, (LPOVERLAPPED*)&pPerIO, WSA_INFINITE);
if (!bOK) // 在此套节字上有错误发生
{
::closesocket(pPerHandle->s);
::GlobalFree(pPerHandle);
::GlobalFree(pPerIO);
continue;
}

if (dwTrans == 0 && // 套节字被对方关闭
(pPerIO->nOperationType == OP_READ || pPerIO->nOperationType == OP_WRITE))

{
::closesocket(pPerHandle->s);
::GlobalFree(pPerHandle);
::GlobalFree(pPerIO);
continue;
}

if (pPerIO->nOperationType == OP_READ)
{
pPerIO->buf[dwTrans] = '\0';//在接收缓冲区的末尾添加字符串结束符
send_packet* packet = new send_packet();
packet->len = dwTrans;
strcpy_s(packet->data, pPerIO->buf);
SaveToFile(packet->data, packet->len);
printf(packet->data);//打印接收到的数据


//回复数据
response_packet response;
response.len = sizeof(response_packet);
response.err = 1;
strcpy_s(response.msg, "成功收到!");
WSABUF sendBuf;
sendBuf.buf = reinterpret_cast<char*>(&response);
sendBuf.len = sizeof(response);
WSASend(pPerHandle->s, &sendBuf, 1, &dwTrans, 0, NULL, NULL);

// 继续投递接收I/O请求
WSABUF buf;
buf.buf = pPerIO->buf;
buf.len = BUFFER_SIZE;
pPerIO->nOperationType = OP_READ;

DWORD nFlags = 0;
::WSARecv(pPerHandle->s, &buf, 1, &dwTrans, &nFlags, &pPerIO->ol, NULL);
}
}
return 0;
}


void main()
{
int nPort = 4567;
// 创建完成端口对象,创建工作线程处理完成端口对象中事件
//要求给完成端口指定的并发线程数量= CPU核数+2,业务服务线程数=1;
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
int numThreads = sysInfo.dwNumberOfProcessors;
HANDLE hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, numThreads + 2);
::CreateThread(NULL, 0, ServerThread, (LPVOID)hCompletion, 0, 0);

// 创建监听套节字,绑定到本地地址,开始监听
SOCKET sListen = ::socket(AF_INET, SOCK_STREAM, 0);
SOCKADDR_IN si;
si.sin_family = AF_INET;
si.sin_port = ::ntohs(nPort);
si.sin_addr.S_un.S_addr = INADDR_ANY;
//绑定
::bind(sListen, (sockaddr*)&si, sizeof(si));
//开启监听
::listen(sListen, 5);

// 循环处理到来的连接
while (TRUE)
{

// 等待接受未决(新)的连接请求
SOCKADDR_IN saRemote;
int nRemoteLen = sizeof(saRemote);
SOCKET sNew = ::accept(sListen, (sockaddr*)&saRemote, &nRemoteLen);
if (sNew == INVALID_SOCKET)
{
int errorCode = WSAGetLastError();
printf("accept failed, error code=%d\n", errorCode);
continue;
}
// 接受到新连接之后,为它创建一个per-handle数据,并将它们关联到完成端口对象。
//创建一个per-handle对象
PPER_HANDLE_DATA pPerHandle =
(PPER_HANDLE_DATA)::GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA));
//设置per-handle对象的目标套接字为sNew 设置客户方的地址信息(初始化)
pPerHandle->s = sNew;
memcpy(&pPerHandle->addr, &saRemote, nRemoteLen);

//将新接收的客户端连接绑定到完成端口对象hCompletion上
::CreateIoCompletionPort((HANDLE)pPerHandle->s, hCompletion, (ULONG_PTR)pPerHandle, 0);

// 投递一个接收请求
//分配一个重叠请求包
PPER_IO_DATA pPerIO = (PPER_IO_DATA)::GlobalAlloc(GPTR, sizeof(PER_IO_DATA));
pPerIO->nOperationType = OP_READ;//初始化为读操作
WSABUF buf;
buf.buf = pPerIO->buf;//pPerIO->buf中存储了在异步 I/O 操作中读取或写入的数据。
buf.len = BUFFER_SIZE;

DWORD dwRecv;
DWORD dwFlags = 0;
WSARecv(pPerHandle->s, &buf, 1, &dwRecv, &dwFlags, &pPerIO->ol, NULL);

}
}

5.个人课程作业附代码

效果展示:

image-20241007115912837

image-20241007115923841

image-20241007115941774

Init.h文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#pragma once
#include <winsock2.h>
#pragma comment(lib, "WS2_32")
class CInitSock
{
public:
CInitSock(BYTE minorVer = 2, BYTE majorVer = 2)
{
WSADATA wsaData;
WORD sockVersion = MAKEWORD(minorVer, majorVer);
if (::WSAStartup(sockVersion, &wsaData) != 0)
{
exit(0);
}
}
~CInitSock()
{
::WSACleanup();
}
};

Server.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#define _CRT_SECURE_NO_WARNINGS
#include "init.h"
#include <stdio.h>
#include <windows.h>
#include<fstream>
#include<ctime>

// 初始化Winsock库
CInitSock theSock;

#define BUFFER_SIZE 1024
typedef struct _PER_HANDLE_DATA // per-handle数据
{
SOCKET s; // 对应的套节字句柄
sockaddr_in addr; // 客户方地址
} PER_HANDLE_DATA, * PPER_HANDLE_DATA;


typedef struct _PER_IO_DATA // per-I/O数据
{
OVERLAPPED ol; // 重叠结构
char buf[BUFFER_SIZE]; // 数据缓冲区
int nOperationType; // 操作类型
#define OP_READ 1
#define OP_WRITE 2
#define OP_ACCEPT 3
} PER_IO_DATA, * PPER_IO_DATA;

//接受数据包结构
struct send_packet
{
byte len; //结构体长度,也就是包长度
byte type; //协议类型
byte cmd; //命令码
char data[64]; //数据
};

//回复数据包结构
struct response_packet
{
byte len; //结构体长度,也就是包长度
byte err; //大于0:成功, 小于0:失败
char msg[32]; //返回消息描述
};

void SaveToFile(const char* data, int len) {

time_t now = time(0);
struct tm tstruct;
char buf[80];
localtime_s(&tstruct, &now);
strftime(buf, sizeof(buf), "file_%Y-%m-%d.txt", &tstruct);
std::ofstream ofs(buf, std::ios::app);
if (ofs) {
ofs.write(data, len);
ofs.close();
}
}


//服务线程
DWORD WINAPI ServerThread(LPVOID lpParam)//函数接受一个LPVOID类型的参数lpParam,这通常是一个通用指针,可以指向任何类型的数据
{
// 得到完成端口对象句柄
HANDLE hCompletion = (HANDLE)lpParam;

DWORD dwTrans;//定义一个变量用于存储传输的字节数
PPER_HANDLE_DATA pPerHandle;//用于存储与套接字相关的数据
PPER_IO_DATA pPerIO;//用于存储与套接字相关的 I/O 操作数据

while (TRUE)
{
// 在关联到此完成端口的所有套节字上等待I/O完成
//调用GetQueuedCompletionStatus函数
//在完成端口hCompletion上等待 I/O 操作完成。
//这个函数会阻塞直到有一个 I/O 操作完成,并将完成的信息填充到相应的变量中,包括传输的字节数dwTrans、与套接字相关的结构体指针pPerHandle和与 I/O 操作相关的结构体指针pPerIO
BOOL bOK = ::GetQueuedCompletionStatus(hCompletion,
&dwTrans, (PULONG_PTR)&pPerHandle, (LPOVERLAPPED*)&pPerIO, WSA_INFINITE);
if (!bOK) // 在此套节字上有错误发生
{
::closesocket(pPerHandle->s);
::GlobalFree(pPerHandle);
::GlobalFree(pPerIO);
continue;
}

if (dwTrans == 0 && // 套节字被对方关闭
(pPerIO->nOperationType == OP_READ || pPerIO->nOperationType == OP_WRITE))

{
::closesocket(pPerHandle->s);
::GlobalFree(pPerHandle);
::GlobalFree(pPerIO);
continue;
}

if (pPerIO->nOperationType == OP_READ)
{
pPerIO->buf[dwTrans] = '\0';//在接收缓冲区的末尾添加字符串结束符
send_packet* packet = new send_packet();
packet->len = dwTrans;
strcpy_s(packet->data, pPerIO->buf);
SaveToFile(packet->data, packet->len);
printf(packet->data);//打印接收到的数据


//回复数据
response_packet response;
response.len = sizeof(response_packet);
response.err = 1;
strcpy_s(response.msg, "成功收到!");
WSABUF sendBuf;
sendBuf.buf = reinterpret_cast<char*>(&response);
sendBuf.len = sizeof(response);
WSASend(pPerHandle->s, &sendBuf, 1, &dwTrans, 0, NULL, NULL);

// 继续投递接收I/O请求
WSABUF buf;
buf.buf = pPerIO->buf;
buf.len = BUFFER_SIZE;
pPerIO->nOperationType = OP_READ;

DWORD nFlags = 0;
::WSARecv(pPerHandle->s, &buf, 1, &dwTrans, &nFlags, &pPerIO->ol, NULL);
}
}
return 0;
}


void main()
{
int nPort = 4567;
// 创建完成端口对象,创建工作线程处理完成端口对象中事件
//要求给完成端口指定的并发线程数量= CPU核数+2,业务服务线程数=1;
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
int numThreads = sysInfo.dwNumberOfProcessors;
HANDLE hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, numThreads + 2);
::CreateThread(NULL, 0, ServerThread, (LPVOID)hCompletion, 0, 0);

// 创建监听套节字,绑定到本地地址,开始监听
SOCKET sListen = ::socket(AF_INET, SOCK_STREAM, 0);
SOCKADDR_IN si;
si.sin_family = AF_INET;
si.sin_port = ::ntohs(nPort);
si.sin_addr.S_un.S_addr = INADDR_ANY;
//绑定
::bind(sListen, (sockaddr*)&si, sizeof(si));
//开启监听
::listen(sListen, 5);

// 循环处理到来的连接
while (TRUE)
{

// 等待接受未决(新)的连接请求
SOCKADDR_IN saRemote;
int nRemoteLen = sizeof(saRemote);
SOCKET sNew = ::accept(sListen, (sockaddr*)&saRemote, &nRemoteLen);
if (sNew == INVALID_SOCKET)
{
int errorCode = WSAGetLastError();
printf("accept failed, error code=%d\n", errorCode);
continue;
}
// 接受到新连接之后,为它创建一个per-handle数据,并将它们关联到完成端口对象。
//创建一个per-handle对象
PPER_HANDLE_DATA pPerHandle =
(PPER_HANDLE_DATA)::GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA));
//设置per-handle对象的目标套接字为sNew 设置客户方的地址信息(初始化)
pPerHandle->s = sNew;
memcpy(&pPerHandle->addr, &saRemote, nRemoteLen);

//将新接收的客户端连接绑定到完成端口对象hCompletion上
::CreateIoCompletionPort((HANDLE)pPerHandle->s, hCompletion, (ULONG_PTR)pPerHandle, 0);

// 投递一个接收请求
//分配一个重叠请求包
PPER_IO_DATA pPerIO = (PPER_IO_DATA)::GlobalAlloc(GPTR, sizeof(PER_IO_DATA));
pPerIO->nOperationType = OP_READ;//初始化为读操作
WSABUF buf;
buf.buf = pPerIO->buf;//pPerIO->buf中存储了在异步 I/O 操作中读取或写入的数据。
buf.len = BUFFER_SIZE;

DWORD dwRecv;
DWORD dwFlags = 0;
WSARecv(pPerHandle->s, &buf, 1, &dwRecv, &dwFlags, &pPerIO->ol, NULL);

}
}


文章作者: Yolo
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Yolo !
评论
  目录