refactor(lwip): Refactor LWIP UDP sync

This commit is contained in:
Dong Heng
2018-10-06 10:38:49 +08:00
parent 9e86bda3d0
commit 34ae970f82
9 changed files with 401 additions and 126 deletions

View File

@ -757,8 +757,10 @@ int lwip_close(int s)
{
int ret;
#if ESP_UDP && LWIP_NETIF_TX_SINGLE_PBUF
udp_sync_close(s);
#if ESP_UDP
struct lwip_sock *sock = get_socket(s);
if (sock)
udp_sync_close_netconn(sock->conn);
#endif
_sock_set_open(s, 0);

View File

@ -1416,7 +1416,7 @@ lwip_netconn_do_send(void *m)
#if LWIP_UDP
case NETCONN_UDP:
#if ESP_UDP
udp_sync_regitser(msg);
udp_sync_regitser_sock(msg);
#endif /* ESP_UDP */
#if LWIP_CHECKSUM_ON_COPY
if (ip_addr_isany(&msg->msg.b->addr) || IP_IS_ANY_TYPE_VAL(msg->msg.b->addr)) {
@ -1442,7 +1442,7 @@ lwip_netconn_do_send(void *m)
}
}
#if ESP_UDP
udp_sync_ack(msg);
udp_sync_ack_sock(msg);
#else
TCPIP_APIMSG_ACK(msg);
#endif /* ESP_UDP */

View File

@ -630,8 +630,8 @@ lwip_close(int s)
#endif /* LWIP_IGMP */
#ifndef SOCKETS_MT
#if ESP_UDP && LWIP_NETIF_TX_SINGLE_PBUF
udp_sync_close(s);
#if ESP_UDP
udp_sync_close_netconn(sock->conn);
#endif
#endif

View File

@ -865,6 +865,10 @@ udp_sendto_if_src_chksum(struct udp_pcb *pcb, struct pbuf *p, const ip_addr_t *d
ttl = pcb->ttl;
#endif /* LWIP_MULTICAST_TX_OPTIONS */
#if ESP_UDP
udp_sync_cache_udp(pcb);
#endif
LWIP_DEBUGF(UDP_DEBUG, ("udp_send: UDP checksum 0x%04"X16_F"\n", udphdr->chksum));
LWIP_DEBUGF(UDP_DEBUG, ("udp_send: ip_output_if (,,,,0x%02"X16_F",)\n", (u16_t)ip_proto));
/* output to IP */
@ -875,6 +879,10 @@ udp_sendto_if_src_chksum(struct udp_pcb *pcb, struct pbuf *p, const ip_addr_t *d
/* @todo: must this be increased even if error occurred? */
MIB2_STATS_INC(mib2.udpoutdatagrams);
#if ESP_UDP
udp_sync_clear_udp();
#endif
/* did we chain a separate header pbuf earlier? */
if (q != p) {
/* free the header pbuf */
@ -1116,6 +1124,11 @@ udp_remove(struct udp_pcb *pcb)
}
}
}
#if ESP_UDP
udp_sync_close_udp(pcb);
#endif
memp_free(MEMP_UDP_PCB, pcb);
}

View File

@ -77,6 +77,8 @@ struct udp_pcb;
typedef void (*udp_recv_fn)(void *arg, struct udp_pcb *pcb, struct pbuf *p,
const ip_addr_t *addr, u16_t port);
typedef int (*udp_cb_fn)(void *arg, int free);
/** the UDP protocol control block */
struct udp_pcb {
/** Common members of all PCB types */
@ -106,6 +108,15 @@ struct udp_pcb {
udp_recv_fn recv;
/** user-supplied argument for the recv callback */
void *recv_arg;
#if ESP_UDP
/* UDP PCB will be clear to "0" when call udp_new() */
/* UDP sync callback function mainly used for resend or active up level task */
udp_cb_fn cb;
/* UDP sync callback function private data */
void *arg;
#endif
};
/* udp_pcbs export for external reference (e.g. SNMP agent) */
extern struct udp_pcb *udp_pcbs;

View File

@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "lwipopts.h"
#if ESP_UDP
#include <stddef.h>
#include <string.h>
#include <stdbool.h>
@ -20,12 +24,13 @@
#include "lwip/priv/api_msg.h"
#include "lwip/priv/tcp_priv.h"
//#define LOG_LOCAL_LEVEL ESP_LOG_VERBOSE
#include "esp_log.h"
#if ESP_UDP && LWIP_NETIF_TX_SINGLE_PBUF
#define UDP_SYNC_SOCK_RETRY_MAX CONFIG_ESP_UDP_SYNC_RETRY_MAX
#define UDP_SYNC_MAX MEMP_NUM_NETCONN
#define UDP_SYNC_RETRY_MAX CONFIG_ESP_UDP_SYNC_RETRY_MAX
#define UDP_SYNC_UDP_RETRY_MAX 64
/*
* All function has no mutex, so they must put into one task(LWIP main task).
@ -37,146 +42,178 @@
#define TCPIP_APIMSG_ACK(m) do { NETCONN_SET_SAFE_ERR((m)->conn, (m)->err); sys_sem_signal(LWIP_API_MSG_SEM(m)); } while(0)
#endif /* LWIP_TCPIP_CORE_LOCKING */
typedef struct udp_sync {
struct api_msg *msg;
#define UDP_SYNC_NONE 0
#define UDP_SYNC_SOCK 1
#define UDP_SYNC_UDP 2
struct netif *netif;
typedef struct udp_sync_method {
uint8_t type;
void (*free) (void *p);
} udp_sync_method_t;
typedef struct udp_sync_udp {
udp_sync_method_t method;
int8_t ret;
uint8_t retry;
} udp_sync_t;
struct pbuf *pbuf;
struct netif *netif;
struct udp_pcb *pcb;
} udp_sync_udp_t;
typedef struct udp_sync_sock {
udp_sync_method_t method;
int8_t ret;
uint8_t retry;
struct api_msg *msg;
struct netif *netif;
} udp_sync_sock_t;
typedef struct udp_sync_netconn {
sys_sem_t *sem;
struct netconn *conn;
} udp_sync_netconn_t;
static const char *TAG = "udp_sync";
static void *s_cur_msg;
static size_t s_msg_type;
static size_t s_udp_sync_num;
static udp_sync_t s_udp_sync[UDP_SYNC_MAX];
static struct api_msg *s_cur_msg;
/*
* @brief initialize UDP sync module
*/
void udp_sync_init(void)
static inline int _udp_need_proc(struct udp_pcb *pcb)
{
memset(s_udp_sync, 0, sizeof(s_udp_sync));
s_udp_sync_num = 0;
return pcb->cb != NULL;
}
/*
* @brief register a UDP API message(struct api_msg) to module
*/
void udp_sync_regitser(void *in_msg)
static inline int _udp_do_proc(struct udp_pcb *pcb, int force)
{
s_cur_msg = in_msg;
return pcb->cb(pcb->arg, force);
}
struct api_msg *msg = (struct api_msg *)in_msg;
int s = msg->conn->socket;
static inline void *_udp_priv_data(struct udp_pcb *pcb)
{
return pcb->arg;
}
if (s < 0 || s >= UDP_SYNC_MAX) {
ESP_LOGE(TAG, "UDP sync register error, socket is %d", s);
} else if (s_udp_sync[s].msg) {
ESP_LOGE(TAG, "UDP sync register error, msg is %p", s_udp_sync[s].msg);
}
static inline void _udp_end_proc(struct udp_pcb *pcb)
{
pcb->cb = NULL;
pcb->arg = NULL;
s_udp_sync_num--;
}
static inline void _udp_add_proc(struct udp_pcb *pcb, udp_cb_fn cb, void *arg)
{
s_udp_sync_num++;
s_udp_sync[s].ret = ERR_OK;
s_udp_sync[s].retry = 0;
s_udp_sync[s].msg = msg;
pcb->cb = cb;
pcb->arg = arg;
}
static void _udp_sync_ack_ret(int s, struct api_msg *msg)
static inline struct udp_pcb *_get_msg_pcb(struct api_msg *msg)
{
return msg->conn->pcb.udp;
}
static void _udp_sync_do_meth_free(udp_sync_method_t *meth)
{
meth->free(meth);
}
static int _udp_sync_ack_sock_ret(struct api_msg *msg, int force)
{
int ret;
struct udp_pcb *pcb = msg->conn->pcb.udp;
udp_sync_sock_t *udp_sync_sock = _udp_priv_data(pcb);
/* Only cache when low-level has no buffer to send packet */
if (s_udp_sync[s].ret != ERR_MEM || s_udp_sync[s].retry >= UDP_SYNC_RETRY_MAX) {
if (force || !udp_sync_sock || udp_sync_sock->ret != ERR_MEM || udp_sync_sock->retry >= UDP_SYNC_SOCK_RETRY_MAX) {
ESP_LOGD(TAG, "UDP sync ret %d retry %d", s_udp_sync[s].ret, s_udp_sync[s].retry);
s_udp_sync[s].msg = NULL;
s_udp_sync[s].retry = 0;
s_udp_sync[s].ret = ERR_OK;
s_udp_sync_num--;
if (udp_sync_sock) {
ESP_LOGD(TAG, "UDP sync sock ret %d retry %d", udp_sync_sock->ret, udp_sync_sock->retry);
}
TCPIP_APIMSG_ACK(msg);
ret = ERR_OK;
} else {
s_udp_sync[s].retry++;
ESP_LOGD(TAG, "UDP sync ack error, errno %d", s_udp_sync[s].ret);
udp_sync_sock->retry++;
ESP_LOGD(TAG, "UDP sync sock ack error, errno %d", udp_sync_sock->ret);
ret = ERR_INPROGRESS;
}
return ret;
}
/*
* @brief ack the message
*/
void udp_sync_ack(void *in_msg)
static void _udp_sync_meth_sock_free(void *p)
{
struct api_msg *msg = (struct api_msg *)in_msg;
int s = msg->conn->socket;
if (s < 0 || s >= UDP_SYNC_MAX) {
ESP_LOGE(TAG, "UDP sync ack error, socket is %d", s);
} else if (!s_udp_sync[s].msg) {
ESP_LOGE(TAG, "UDP sync ack error, msg is NULL");
}
_udp_sync_ack_ret(s, msg);
s_cur_msg = NULL;
heap_caps_free(p);
}
/*
* @brief set the current message send result
*/
void udp_sync_set_ret(void *netif, int ret)
static void _udp_sync_meth_udp_free(void *p)
{
/* Only poll and regitser can set current message */
if (!s_cur_msg || !sys_current_task_is_tcpip()) {
/* You may use it to debug */
//ESP_LOGE(TAG, "UDP sync ack error, current message is %p, task name is %s", s_cur_msg, sys_current_task_name());
return ;
}
udp_sync_udp_t *udp_sync_udp = (udp_sync_udp_t *)p;
struct api_msg *msg = s_cur_msg;
int s = msg->conn->socket;
if (s < 0 || s >= UDP_SYNC_MAX) {
ESP_LOGE(TAG, "UDP sync ack error, socket is %d", s);
} else if (!s_udp_sync[s].msg) {
ESP_LOGE(TAG, "UDP sync ack error, msg is NULL");
}
s_udp_sync[s].netif = netif;
s_udp_sync[s].ret = ret;
pbuf_free(udp_sync_udp->pbuf);
heap_caps_free(udp_sync_udp);
}
static void udp_sync_send(struct api_msg *msg)
static int sock_udp_sync_sock_cb(void *p, int force)
{
struct pbuf *p = msg->msg.b->p;
int s = msg->conn->socket;
struct netif *netif = s_udp_sync[s].netif;
int ret;
udp_sync_sock_t *udp_sync_sock = (udp_sync_sock_t *)p;
struct api_msg *msg = udp_sync_sock->msg;
struct netif *netif = udp_sync_sock->netif;
struct pbuf *pbuf = msg->msg.b->p;
s_cur_msg = msg;
s_msg_type = UDP_SYNC_SOCK;
netif->linkoutput(netif, p);
_udp_sync_ack_ret(s, msg);
netif->linkoutput(netif, pbuf);
ret = _udp_sync_ack_sock_ret(msg, force);
s_msg_type = UDP_SYNC_NONE;
s_cur_msg = NULL;
return ret;
}
/*
* @brief process the sync
*/
void udp_sync_proc(void)
static int sock_udp_sync_udp_cb(void *p, int force)
{
if (!s_udp_sync_num)
return ;
int ret;
udp_sync_udp_t *udp_sync_udp = (udp_sync_udp_t *)p;
struct netif *netif = udp_sync_udp->netif;
struct pbuf *pbuf = udp_sync_udp->pbuf;
struct udp_pcb *pcb = udp_sync_udp->pcb;
for (int i = 0; i < UDP_SYNC_MAX; i++) {
if (!s_udp_sync[i].msg)
continue;
s_cur_msg = pcb;
s_msg_type = UDP_SYNC_UDP;
udp_sync_send(s_udp_sync[i].msg);
netif->linkoutput(netif, pbuf);
if (s_udp_sync[i].ret == ERR_MEM)
break;
s_msg_type = UDP_SYNC_NONE;
s_cur_msg = NULL;
if (force || udp_sync_udp->ret != ERR_MEM || udp_sync_udp->retry >= UDP_SYNC_UDP_RETRY_MAX) {
if (udp_sync_udp) {
ESP_LOGD(TAG, "UDP sync sync ret %d retry %d", udp_sync_udp->ret, udp_sync_udp->retry);
}
ret = ERR_OK;
} else {
udp_sync_udp->retry++;
ESP_LOGD(TAG, "UDP sync udp send error, errno %d", udp_sync_udp->ret);
ret = ERR_INPROGRESS;
}
return ret;
}
/*
@ -187,6 +224,167 @@ static void udp_sync_trigger_null(void *p)
}
static void udp_sync_do_close_netconn(void *p)
{
udp_sync_netconn_t *sync = (udp_sync_netconn_t *)p;
struct netconn *conn = sync->conn;
sys_sem_t *sem = sync->sem;
struct udp_pcb *pcb = conn->pcb.udp;
udp_sync_close_udp(pcb);
sys_sem_signal(sem);
}
/*
* @brief ack the message
*/
void udp_sync_regitser_sock(void *in_msg)
{
s_cur_msg = in_msg;
s_msg_type = UDP_SYNC_SOCK;
ESP_LOGD(TAG, "UDP sync regitser sock msg %p", in_msg);
}
/*
* @brief ack the message
*/
void udp_sync_ack_sock(void *in_msg)
{
int ret;
struct api_msg *msg = (struct api_msg *)in_msg;
struct udp_pcb *pcb = _get_msg_pcb(msg);
ret = _udp_sync_ack_sock_ret(msg, 0);
if (ret == ERR_OK && _udp_need_proc(pcb)) {
udp_sync_method_t *method = (udp_sync_method_t *)_udp_priv_data(pcb);
_udp_sync_do_meth_free(method);
_udp_end_proc(pcb);
}
s_msg_type = UDP_SYNC_NONE;
s_cur_msg = NULL;
ESP_LOGD(TAG, "UDP sync ack msg %p", msg);
}
/*
* @brief set the current message send result
*/
void udp_sync_set_ret(void *netif, void *in_pbuf, int ret)
{
struct udp_pcb *pcb;
struct api_msg *msg;
udp_sync_sock_t *udp_sync_sock;
udp_sync_udp_t *udp_sync_udp;
struct pbuf *pbuf;
/* Only poll and regitser can set current message */
if (!s_cur_msg || !sys_current_task_is_tcpip()) {
/* You may use it to debug */
//ESP_LOGE(TAG, "UDP sync ack error, current message is %p, task name is %s", s_cur_msg, sys_current_task_name());
return ;
}
switch (s_msg_type) {
case UDP_SYNC_SOCK:
msg = s_cur_msg;
pcb = _get_msg_pcb(msg);
udp_sync_sock = _udp_priv_data(pcb);
if (udp_sync_sock) {
udp_sync_sock->ret = ret;
ESP_LOGD(TAG, "UDP sync set1 port %d ret %d netif %p", pcb->local_port, ret, netif);
return ;
} else {
if (ERR_OK == ret) {
return ;
}
}
udp_sync_sock = heap_caps_malloc(sizeof(udp_sync_sock_t), MALLOC_CAP_8BIT);
if (!udp_sync_sock) {
ESP_LOGE(TAG, "UDP sync sock regitser error MEM_ERR");
return ;
}
udp_sync_sock->method.free = _udp_sync_meth_sock_free;
udp_sync_sock->method.type = UDP_SYNC_SOCK;
udp_sync_sock->msg = msg;
udp_sync_sock->retry = 0;
udp_sync_sock->netif = netif;
udp_sync_sock->ret = ret;
_udp_add_proc(pcb, sock_udp_sync_sock_cb, udp_sync_sock);
ESP_LOGD(TAG, "UDP sync set2 port %d ret %d netif %p", pcb->local_port, ret, netif);
break;
case UDP_SYNC_UDP:
pcb = s_cur_msg;
udp_sync_udp = _udp_priv_data(pcb);
if (udp_sync_udp) {
udp_sync_udp->ret = ret;
ESP_LOGD(TAG, "UDP sync set3 port %d ret %d netif %p", pcb->local_port, ret, netif);
return ;
} else {
if (ERR_OK == ret)
return ;
}
udp_sync_udp = heap_caps_malloc(sizeof(udp_sync_udp_t), MALLOC_CAP_8BIT);
if (!udp_sync_udp) {
ESP_LOGE(TAG, "UDP sync udp regitser error MEM_ERR");
return ;
}
pbuf = (struct pbuf *)in_pbuf;
pbuf_ref(pbuf);
udp_sync_udp->method.free = _udp_sync_meth_udp_free;
udp_sync_udp->method.type = UDP_SYNC_UDP;
udp_sync_udp->pbuf = pbuf;
udp_sync_udp->retry = 0;
udp_sync_udp->netif = netif;
udp_sync_udp->pcb = pcb;
udp_sync_udp->ret = ret;
_udp_add_proc(pcb, sock_udp_sync_udp_cb, udp_sync_udp);
ESP_LOGD(TAG, "UDP sync set4 port %d ret %d netif %p", pcb->local_port, ret, netif);
break;
default:
break;
}
}
void udp_sync_cache_udp(void *pcb)
{
if (s_cur_msg || !sys_current_task_is_tcpip())
return ;
s_msg_type = UDP_SYNC_UDP;
s_cur_msg = pcb;
ESP_LOGD(TAG, "UDP sync cache udp %p", pcb);
}
void udp_sync_clear_udp(void)
{
if (s_msg_type != UDP_SYNC_UDP || !sys_current_task_is_tcpip())
return ;
struct udp_pcb *pcb = (struct udp_pcb *)s_cur_msg;
s_msg_type = UDP_SYNC_NONE;
s_cur_msg = NULL;
ESP_LOGD(TAG, "UDP sync clear udp %p", pcb);
}
/*
* @brief trigger a UDP sync process
*/
@ -198,28 +396,66 @@ void udp_sync_trigger(void)
tcpip_callback_with_block(udp_sync_trigger_null, NULL, 0);
}
/*
* @brief close and clear the udp sync
*/
static void udp_sync_do_close(void *p)
/**
* @brief close the udp sync before close the socket
*/
void udp_sync_close_udp(void *udp_pcb)
{
int s = (int)p;
struct udp_pcb *pcb = udp_pcb;
if (s_udp_sync[s].msg) {
ESP_LOGD(TAG, "UDP sync close socket %d", s);
s_udp_sync[s].msg = NULL;
s_udp_sync[s].retry = 0;
s_udp_sync[s].ret = ERR_OK;
s_udp_sync_num--;
ESP_LOGD(TAG, "UDP sync close port %d", pcb->local_port);
if (_udp_need_proc(pcb)) {
udp_sync_method_t *method = (udp_sync_method_t *)_udp_priv_data(pcb);
_udp_do_proc(pcb, 1);
_udp_sync_do_meth_free(method);
_udp_end_proc(pcb);
ESP_LOGD(TAG, "UDP sync free proc port %d", pcb->local_port);
}
}
/**
* @brief close the udp sync before close the socket
* @brief close the udp sync netconn before close the socket
*/
void udp_sync_close(int s)
void udp_sync_close_netconn(void *netconn)
{
tcpip_callback_with_block(udp_sync_do_close, (void *)s, 1);
udp_sync_netconn_t sync;
struct netconn *conn = netconn;
if (conn->type != NETCONN_UDP)
return ;
sync.conn = netconn;
sync.sem = sys_thread_sem_get();
tcpip_callback(udp_sync_do_close_netconn, &sync);
sys_arch_sem_wait(sync.sem, 0);
}
/*
* @brief process the sync
*/
void udp_sync_proc(void)
{
struct udp_pcb *pcb;
if (!s_udp_sync_num)
return ;
for (pcb = udp_pcbs; pcb != NULL; pcb = pcb->next) {
udp_sync_method_t *method;
if (!_udp_need_proc(pcb) || _udp_do_proc(pcb, 0) != ERR_OK)
continue;
method = (udp_sync_method_t *)_udp_priv_data(pcb);
_udp_sync_do_meth_free(method);
_udp_end_proc(pcb);
ESP_LOGD(TAG, "UDP sync end proc port %d", pcb->local_port);
}
}
#endif /* ESP_UDP */

View File

@ -2224,8 +2224,8 @@ void *memp_malloc_ll(size_t type);
*/
#if ESP_UDP
#if !LWIP_UDP || !LWIP_SOCKET || !ESP_LWIP
#error "LWIP_UDP & LWIP_SOCKET & ESP_LWIP must be enable"
#if !LWIP_UDP || !LWIP_SOCKET || !ESP_LWIP || !LWIP_NETIF_TX_SINGLE_PBUF
#error "LWIP_UDP & LWIP_SOCKET & ESP_LWIP & LWIP_NETIF_TX_SINGLE_PBUF must be enable"
#else
#include "udp_sync.h"
#endif

View File

@ -29,26 +29,30 @@ void udp_sync_init(void);
*
* @param in_msg message pointer
*/
void udp_sync_regitser(void *in_msg);
void udp_sync_regitser_sock(void *in_msg);
/*
* @brief ack the message
*
* @param in_msg message pointer
*/
void udp_sync_ack(void *in_msg);
void udp_sync_ack_sock(void *in_msg);
/*
* @brief set the current message send result
*
*
* @param netif LwIP netif pointer
* @param pbuf low-level netif output pbuf pointer
* @param ret current message send result
*/
void udp_sync_set_ret(void *netif, int ret);
void udp_sync_set_ret(void *netif, void *pbuf, int ret);
void udp_sync_cache_udp(void *pcb);
void udp_sync_clear_udp(void);
/*
* @brief process the sync
*
* @param ret current message send result
*/
void udp_sync_proc(void);
@ -58,9 +62,18 @@ void udp_sync_proc(void);
void udp_sync_trigger(void);
/**
* @brief close the udp sync before close the socket
* @brief close the udp pcb
*
* @param udp_pcb LwIP raw UDP pcb pointer
*/
void udp_sync_close(int s);
void udp_sync_close_udp(void *udp_pcb);
/**
* @brief close the udp sync netconn before close the socket
*
* @param netconn LwIP raw netconn pointer
*/
void udp_sync_close_netconn(void *netconn);
#ifdef __cplusplus
}

View File

@ -345,7 +345,7 @@ static int8_t low_level_output(struct netif* netif, struct pbuf* p)
*/
err = esp_aio_sendto(&aio, NULL, 0);
#if ESP_UDP
udp_sync_set_ret(netif, err);
udp_sync_set_ret(netif, p, err);
#endif
if (err != ERR_OK) {