First knowledge of libcurl multi: implementation of an HTTP request processing client, a thread play hard to send a thread spit blood collection

knowledge libcurl multi implementation http


One 、 introduction

Recently at work , I met such a need :

We want to have a high-performance http Request processing client , This client requires such an architecture :

  1. It has two threads
  2. A thread receives a batch of... Sent by a business program through a message queue http Request information , Make a batch of http request
  3. Another thread receives external http The reply , And put the reply information into the local message queue for the business program to use
  4. Ask for in http During the processing of the request , Try not to block high performance processing

The framework of this requirement is like this :

n Business process http client n individual http Server program Message queue : Send bulk http Request message libcurl multi: Initiate a batch of http request libcurl multi: Deal with bulk http The reply Message queue : forward http The response message of the server n Business process http client n individual http Server program

You can see , Our needs , In fact, it is to meet a large number of business procedures in the platform http Request processing requirements . Of course, there is more than one business procedure , Business process to request http There is more than one server , What we have to do http The request client is actually like a gateway middle layer .

So here's the problem , If you do http client High performance processing of programs . We certainly don't want it to block at one request , This will be very, very slow ; We also don't want to spend too much time processing the response , It will also be very, very slow .

here , I think of learning at the beginning libcurl When I read that sentence on the official website :

The multi interface allows a single-threaded application to perform the same kinds of multiple, simultaneous transfers that multi-threaded programs can perform. It allows many of the benefits of multi-threaded transfers without the complexity of managing and synchronizing many threads.

Especially the most critical function to execute the request curl_multi_perform Functions are asynchronous :

curl_multi_perform is asynchronous. It will only perform what can be done now and then return back control to your program. It is designed to never block. You need to keep calling the function until all transfers are completed.

This interface is very suitable for our current requirements scenario , When http client Initiate a batch of http On request , Use libcurl multi Interface can realize non blocking asynchronous processing .

So how to design and implement ?

here , I refer to libcurl An official example of 10-at-a-time.c The code framework of , Two design schemes are realized ( There is a slight difference in thinking ) The implementation of the ; And because of the working environment libcurl It's an old version of the library ( No, curl_multi_wait Version of function ) At the same time, the new version ( Use curl_multi_wait) Code and older versions of ( No, curl_multi_wait Version of function ) Code for .

Let's start :)

ps:
1. 10-at-a-time.c Official sample code website https://curl.haxx.se/libcurl/c/10-at-a-time.html
2. All the implementation code in this blog is hosted in GitHub On , Warehouse address https://github.com/wangying2016/libcurl_multi_http_client

Two 、 analysis :10-at-a-time.c What has been achieved

In the design and implementation of our own http client Before the program , Let's first take a look at this official example that inspires us a lot demo 10-at-a-time.c What has been achieved .

according to libcurl Different versions of ,10-at-a-time.c The code is a little different , Here I only analyze the latest use curl_multi_wait Version of function ( This version of the code is also quite concise ), It's all the same , It's just that it's not the same way ( If you're like me , Also consider compatibility with older versions of libcurl, So I suggest you download one on the official website curl-7.20.0.tar, What's in there docs/examples/10-at-a-time.c Go to see how the old version of ).

Here you can go through the website https://curl.haxx.se/libcurl/c/10-at-a-time.html Click in to read 10-at-a-time.c Code for . Let me briefly analyze the design of this program .

1. 10-at-a-time.c Program function

Compile :

$ gcc -o 10-at-a-time 10-at-a-time.c -lcurl

You can get 10-at-a-time The executive documents of :

$ ./10-at-a-time

You can find out , This program implements a lot of http Request function . What is the specific concurrency ?10, This is also the source of its name .

So how does it work ?

2. 10-at-a-time.c Frame analysis

I don't waste my breath here , Go straight to the flowchart ( Details like transfers Is it less than the current total url total 、 Whether the message is processed successfully or not is not reflected in the flowchart , You can read the code carefully ):

The outer loop
Use curl_multi_perform Mass launch http request , Use curl_multi_wait Poll all requests handle Readable state . The outer loop is mainly making requests , And polling requests handle state .

Created with Raphaël 2.2.0 Start add to 10 A start url request curl_multi_perform Mass launch http request The inner loop processes the response and adds the request curl_multi_wait Poll all requests handle Whether there are active requests or outstanding requests end yes no

Inner circulation
Use curl_multi_info_read Read reply message structure , Use curl_easy_getinfo Get specific information , Use curl_multi_remove_handle Delete processed requests handle, stay add_transfer Use in a function curl_multi_add_handle Add new request .

Created with Raphaël 2.2.0 The inner circle begins curl_multi_info_read Read reply message curl_easy_getinfo Get specific information curl_multi_remove_handle Delete request handle curl_multi_add_handle Add new request Is there any message that can be processed End of inner cycle yes no

in general
10-at-a-time.c Program , Outer loop mass launch http Request and poll the request handle state , Inner circulation treatment http The reply , At the same time, delete the processed requests and add new ones .

That's it ,10-at-a-time.c Completed a high concurrency http request , And it can also dynamically add the function of request processing .

It's equivalent to , You can burn it at the same time 10 Put the firewood , It can also clean up the burnt firewood and add firewood automatically !

That's what I want :)

3、 ... and 、 Scheme 1 : Thread a batch send and receive , Thread 2 processes the reply message of thread 1 global cache

10-at-a-time.c It's a threaded program , How to make it a two thread framework , I used my head a little , The following scheme has been designed :

Thread one is responsible for http Bulk sending and receiving of requests , Then store the received message into the global variable ; Thread 2 is responsible for the actual processing of the reply message stored in the global variable by thread 1

Since the global variable is used to store the response message , So the synchronization between the two threads must be done well (mutex). in addition , Here I use a very simple way to record whether to process :

Define a reply message structure , It defines a variable consumable, It's for 1 It means that the received response can be processed , by 2 It means that the treatment is finished

1. Storage structure

First , The request is defined in the global variable :

char *urls[] = {
"https://www.microsoft.com",
"https://opensource.org",
...
};

In order to record the response information , I have defined the structure of an answer message , There is no response information stored here , Because it's so big , Simply record whether it can be processed by thread two ( That is to say, I received the response , Record consumable by 1 state , Thread two sees consumable by 1 I'll deal with it , Set to 2):

// response struct
struct res_info {
int index;
// 0: initial 1: can consume 2: consume end
int consumable;
char url[MAX_URL_LENGTH];
};

among index Keep a record of the time url In the global urls The serial number in ,consumable It records whether the response message can be processed ,0 It's initialization state ,1 Thread one is added to the global variable for thread two to process the State ,2 It's the status of thread 2 .

// response
struct res_info res_list[NUM_URLS];

res_list It is the global response message storage variable ,NUM_URLS It's all url The number of ( It's just a demo, In the future, it can be simply expanded into linked list or queue for dynamic expansion ). In thread two , I just need to traverse res_list, Read it consumable by 1 The response message of , Set to... After processing 2 Mark that it's done .

2. Individual functions

Add the requested function
In the inner loop of thread one , After processing a request, a new request will be added ( If there are any outstanding requests ). The most important thing here curl_multi_add_handle function . It is worth noting that , I will be here url It's recorded in the request information , After getting the answer , I can easily get url.

// add request
static void add_transfer(CURLM *cm, int i)
{
CURL *eh = curl_easy_init();
curl_easy_setopt(eh, CURLOPT_WRITEFUNCTION, write_cb);
curl_easy_setopt(eh, CURLOPT_URL, urls[i]);
curl_easy_setopt(eh, CURLOPT_TIMEOUT, 10L);
curl_easy_setopt(eh, CURLOPT_PRIVATE, urls[i]);
curl_multi_add_handle(cm, eh);
}

Function to record the request sequence number
This function supplies the thread to pass through url Get its sequence number in the global request list , This serial number is mainly used for logging , I can tell you the number url It's been dealt with and so on .

// find index
int find_index_of_urls(char *url) {
int i;
for (i = 0; i < NUM_URLS; ++i) {
if (strcmp(urls[i], url) == 0)
return i;
}
return 0;
}

Add function of reply message
Thread one received an answer , Add a new one res_info structure , take consumable Set up 1 Mark that it can handle , Record index and url, In the end, I put this res_info Update to global res_list Go to... In the response message list .

// add response
void add_response(struct res_info res) {
int i = 0;
pthread_mutex_lock(&res_mutex);
res_list[res.index] = res;
res_list[res.index].consumable = 1;
// sleep(1);
pthread_mutex_unlock(&res_mutex);
}

Function of processing response message
Traversal in thread 2 res_list Global response message list , Then check , Is there any message that can be processed , Some words will be consumable Set as 2 Mark that it has been processed .

// consume response
void consume_response() {
int i;
pthread_mutex_lock(&res_mutex);
for (i = 0; i < NUM_URLS; ++i) {
if (res_list[i].consumable == 1) {
printf("Log: t2, consume response, index [%d], url [%s]\n", \
res_list[i].index, res_list[i].url);
res_list[i].consumable = 2;
}
}
pthread_mutex_unlock(&res_mutex);
}

Check if all functions have been processed
Thread 2 checks that all response messages have been processed through this function .

/ check consume finished
int check_consume_finished() {
int i, finished;
finished = 1;
pthread_mutex_lock(&res_mutex);
for (i = 0; i < NUM_URLS; ++i) {
if (res_list[i].consumable != 2) {
finished = 0;
break;
}
}
pthread_mutex_unlock(&res_mutex);
return finished;
}

3. Thread functions

Thread one
Thread 1 middle school , Bulk delivery http request , And process the received response , And update the response message to the global res_list Go to variables .

// t1's thread function
void *fun1() {
printf("Log: t1 begin...\n");
CURLM *cm;
CURLMsg *msg;
int msgs_left = -1;
int still_alive = 1;
curl_global_init(CURL_GLOBAL_ALL);
cm = curl_multi_init();
/* Limit the amount of simultaneous connections curl should allow: */
curl_multi_setopt(cm, CURLMOPT_MAXCONNECTS, (long)MAX_PARALLEL);
for(transfers = 0; transfers < MAX_PARALLEL; transfers++)
add_transfer(cm, transfers);
do {
curl_multi_perform(cm, &still_alive);
while((msg = curl_multi_info_read(cm, &msgs_left))) {
if(msg->msg == CURLMSG_DONE) {
struct res_info res;
char *url;
CURL *e = msg->easy_handle;
curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &url);
strcpy(res.url, url);
res.index = find_index_of_urls(res.url);
add_response(res);
printf("Log: t1, add response, error number [%d], error messsage [%s], url [%s], "
"index [%d]\n", msg->data.result, curl_easy_strerror(msg->data.result), res.url, \
res.index);
curl_multi_remove_handle(cm, e);
curl_easy_cleanup(e);
}
else {
printf("Log: t1, request error, error number [%d]\n", msg->msg);
}
if(transfers < NUM_URLS)
add_transfer(cm, transfers++);
}
if(still_alive)
curl_multi_wait(cm, NULL, 0, 1000, NULL);
} while(still_alive || (transfers < NUM_URLS));
curl_multi_cleanup(cm);
curl_global_cleanup();
printf("Log: t1 end...\n");
}

Thread two
Thread 2 middle school , Always check the global response message list , If there is a reply message that can be processed , Then deal with it , Until it's all done .

// t2's thread function
void *fun2() {
int i = 0;
printf("Log: t2 begin...\n");
do {
consume_response();
sleep(1);
} while (!check_consume_finished());
printf("Log: t2 end...\n");
}

thus , We have achieved a simple high performance http Request processing client demo Program . Please refer to my GitHub Upper http_client_v1.c, among http_client_v1_old.c Is compatible without curl_multi_wait Functional libcurl Library version code .

Four 、 Option two : Thread one is responsible for batch sending , Thread 2 is only responsible for batch processing

First option , To be honest, it's not elegant , Through a globally cached reply message list ( And through mutex Lock synchronization ) Realize the division of labor between two threads . actually , Or once the thread has finished all the work , In charge of delivering , And be responsible for receiving .

that ,libcurl multi Whether to support will http The request and processing of are separated into two threads :

ibcurl is thread safe but has no internal thread synchronization. You may have to provide your own locking should you meet any of the thread safety exceptions below.

Handles. You must never share the same handle in multiple threads. You can pass the handles around among threads, but you must never use a single handle from more than one thread at any given time.

Look at the places in bold ,libcurl The official documents clearly state ,libcurl Sharing... In two threads is not recommended handle. That's the trouble ,libcurl multi Of handle Can it be shared between two threads ?

My idea is : A thread uses libcurl multi handle To be responsible for making requests , Then add the unprocessed request ; Another thread uses libcurl multi handle Take care of the request , Then delete the processed request . According to the principle that , The thread has been adding requests , That is to say libcurl multi Medium easy handle Stack medium pressure stack , But not for what already exists easy handle What to do , It is expected that the processing of the corresponding reply message in thread 2 will not be affected .

therefore , Maybe ?!

therefore , Let's try ?

T_T, Do as you say !

1. Storage structure

In this scheme, the global variable used to store the response message in scheme I is removed , Because it's useless ( Thread 2 directly processes ).

Reply message structure
This is a little different ,consumable by 0 For initialization ,consumable by 1 It means that the treatment is finished .

// response struct
struct res_info {
int index;
// 0: initial 1: consume end
int consumable;
char url[MAX_URL_LENGTH];
};

libcurl multi handle
Defined in global variables , stay main Function to initialize and clean up :

// multi handle
CURLM *cm;
...
int main() {
// Initialize multi handle
curl_global_init(CURL_GLOBAL_ALL);
cm = curl_multi_init();
...
// Clean mutli handle
curl_multi_cleanup(cm);
curl_global_cleanup();
...
}

Request request count is currently being processed
Added a variable to record the request currently being processed , Convenient thread one when adding new requests , No more than the value set for the upper limit of concurrency ( And lock it synchronously ):

// on-going request
unsigned int doing_cnt = 0;
// cnt mutex
pthread_mutex_t cnt_mutex = PTHREAD_MUTEX_INITIALIZER;

2. Thread functions

Thread one
Now? , As soon as the thread is done 10-at-a-time.c The work of the outer cycle in . Keep on launching http request , Then poll the request status , Then add a new request .

// t1's thread function
void *fun1() {
printf("Log: t1 begin...\n");
int still_alive = 1;
/* Limit the amount of simultaneous connections curl should allow: */
curl_multi_setopt(cm, CURLMOPT_MAXCONNECTS, (long)MAX_PARALLEL);
for(transfers = 0; transfers < MAX_PARALLEL; transfers++) {
add_transfer(transfers);
printf("Log: t1, add request, index = [%d], url = [%s]\n", \
find_index_of_urls(urls[transfers]), urls[transfers]);
}
do {
curl_multi_perform(cm, &still_alive);
if(still_alive)
curl_multi_wait(cm, NULL, 0, 1000, NULL);
if(transfers < NUM_URLS && doing_cnt < MAX_PARALLEL) {
add_transfer(transfers);
printf("Log: t1, add request, index = [%d], url = [%s]\n", \
find_index_of_urls(urls[transfers]), urls[transfers]);
transfers++;
}
// sleep(1);
} while(still_alive || (transfers < NUM_URLS));
printf("Log: t1 end...\n");
}

Thread two
Thread 2 middle school , Execution is 10-at-a-time.c The inner cycle of work in , Always reading the response message that can be processed , Then delete the processed request .

// t2's thread function
void *fun2() {
int i = 0;
int msgs_left = -1;
CURLMsg *msg;
printf("Log: t2 begin...\n");
do {
while((msg = curl_multi_info_read(cm, &msgs_left))) {
if(msg->msg == CURLMSG_DONE) {
char *url;
CURL *e = msg->easy_handle;
curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &url);
struct res_info res;
strcpy(res.url, url);
res.index = find_index_of_urls(res.url);
res.consumable = 1;
res_list[res.index] = res;
printf("Log: t2, accept response, index [%d], url [%s], error number [%d], error messsage [%s]\n", \
res.index, res.url, ms g->data.result, curl_easy_strerror(msg->data.result));
curl_multi_remove_handle(cm, e);
curl_easy_cleanup(e);
}
else {
printf("Log: t2, request error, error number [%d]\n", msg->msg);
}
pthread_mutex_lock(&cnt_mutex);
doing_cnt--;
pthread_mutex_unlock(&cnt_mutex);
}
// sleep(1);
} while (!check_consume_finished());
printf("Log: t2 end...\n");
}

3. In doubt

Although I changed the code here , Implementation of the second scheme of the code . But I'm still confused :

1. I'm not right libcurl multi handle The synchronization lock mechanism , But it seems that no matter how tested , The code is running normally . Is that what the official website says , Cannot share... In multiple threads libcurl Of handle There are certain exceptions ?

2. I risk libcurl The official website is not following up , Shared... In two threads libcurl multi Interface handle, In fact, there is no problem . Is it because my thread one only adds the requested operation , So it doesn't affect thread two handle Why do you use it ?

3. Is the code of scheme II problematic ? At least for now , There's no problem with execution , And more url There was no problem with the request .

These problems , At present, I can't answer , Maybe I need to go deep in the future libcurl Source code to explore the answer . Here also hope to have the related experience the netizen can provide me the answer , I've asked for your advice here :)

The code of scheme 2 is in GitHub Upper http_client_v2.c in , alike , http_client_v2_old.c Is compatible without curl_multi_wait Functional libcurl Library version code .

2020-02-19 to update , There must be something wrong with this program code

According to the official website :

libcurl is thread safe but has no internal thread synchronization.

libcurl There is no internal synchronous return measure , Therefore, the high concurrency of the code in this scheme is bound to be problematic . A thread to multi handle Operation increased , A thread to multi handle Operation delete , There will be thread safety problems in high parallel operation .

therefore , The code of scheme 2 is only for exploring trial and error , The problems are also emphasized here , Also note that :)

5、 ... and 、 summary

Through the study libcurl multi Examples of using interfaces 10-at-a-time.c Code for , I implemented a high performance with two threads http Client requester :

1. A thread is playing with life
2. A thread spits blood to collect

In the process, right libcurl multi The operation mechanism of the interface has a deeper understanding . In the process of exploration, we also implemented two versions of code , And compatible with no curl_multi_wait Of a function of libcurl Old version of the library code .

There are some achievements in the short exploration , But long term exploration still needs to be done . In the process of exploration , I always find that my knowledge storage is too weak , Maybe after I learned more about the Internet , Come back to see this , There will be different understandings ^_^

To be Stronger:)

版权声明
本文为[I've been to places that span an hour]所创,转载请带上原文链接,感谢

  1. [front end -- JavaScript] knowledge point (IV) -- memory leakage in the project (I)
  2. This mechanism in JS
  3. Vue 3.0 source code learning 1 --- rendering process of components
  4. Learning the realization of canvas and simple drawing
  5. gin里获取http请求过来的参数
  6. vue3的新特性
  7. Get the parameters from HTTP request in gin
  8. New features of vue3
  9. vue-cli 引入腾讯地图(最新 api,rocketmq原理面试
  10. Vue 学习笔记(3,免费Java高级工程师学习资源
  11. Vue 学习笔记(2,Java编程视频教程
  12. Vue cli introduces Tencent maps (the latest API, rocketmq)
  13. Vue learning notes (3, free Java senior engineer learning resources)
  14. Vue learning notes (2, Java programming video tutorial)
  15. 【Vue】—props属性
  16. 【Vue】—创建组件
  17. [Vue] - props attribute
  18. [Vue] - create component
  19. 浅谈vue响应式原理及发布订阅模式和观察者模式
  20. On Vue responsive principle, publish subscribe mode and observer mode
  21. 浅谈vue响应式原理及发布订阅模式和观察者模式
  22. On Vue responsive principle, publish subscribe mode and observer mode
  23. Xiaobai can understand it. It only takes 4 steps to solve the problem of Vue keep alive cache component
  24. Publish, subscribe and observer of design patterns
  25. Summary of common content added in ES6 + (II)
  26. No.8 Vue element admin learning (III) vuex learning and login method analysis
  27. Write a mini webpack project construction tool
  28. Shopping cart (front-end static page preparation)
  29. Introduction to the fluent platform
  30. Webpack5 cache
  31. The difference between drop-down box select option and datalist
  32. CSS review (III)
  33. Node.js学习笔记【七】
  34. Node.js learning notes [VII]
  35. Vue Router根据后台数据加载不同的组件(思考-&gt;实现-&gt;不止于实现)
  36. Vue router loads different components according to background data (thinking - & gt; Implementation - & gt; (more than implementation)
  37. 【JQuery框架,Java编程教程视频下载
  38. [jQuery framework, Java programming tutorial video download
  39. Vue Router根据后台数据加载不同的组件(思考-&gt;实现-&gt;不止于实现)
  40. Vue router loads different components according to background data (thinking - & gt; Implementation - & gt; (more than implementation)
  41. 【Vue,阿里P8大佬亲自教你
  42. 【Vue基础知识总结 5,字节跳动算法工程师面试经验
  43. [Vue, Ali P8 teaches you personally
  44. [Vue basic knowledge summary 5. Interview experience of byte beating Algorithm Engineer
  45. 【问题记录】- 谷歌浏览器 Html生成PDF
  46. [problem record] - PDF generated by Google browser HTML
  47. 【问题记录】- 谷歌浏览器 Html生成PDF
  48. [problem record] - PDF generated by Google browser HTML
  49. 【JavaScript】查漏补缺 —数组中reduce()方法
  50. [JavaScript] leak checking and defect filling - reduce() method in array
  51. 【重识 HTML (3),350道Java面试真题分享
  52. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  53. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  54. [re recognize HTML (3) and share 350 real Java interview questions
  55. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  56. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  57. 【重识 HTML ,nginx面试题阿里
  58. 【重识 HTML (4),ELK原来这么简单
  59. [re recognize HTML, nginx interview questions]
  60. [re recognize HTML (4). Elk is so simple