开发示例

更新时间:
2024-12-19

开发示例

本节分别以客户端和服务端发送文件为例,介绍如何使用 VSOA Stream 发送文件。

客户端发送文件

下面以客户端发送文件,服务端接收文件为例,介绍 VSOA Stream 发送文件的开发流程和代码示例。客户端发送的文件为执行路径下的 client.send,用户需要自己创建该文件。

开发流程

  1. 客户端首先调用 RPC 接口发送写文件请求,将文件名和文件大小附加到 payload 参数发送给服务端。
  2. 服务端收到该 RPC 请求后,保存文件信息并创建一个用于接收该文件的 Stream 服务端,将 Stream 服务端的通道号通过 RPC 应答形式返回给客户端,启动接收线程,结束当前的 RPC 流程。
  3. 客户端根据服务端返回的流通道创建 Stream 客户端,并循环读取文件发送给服务端,直到发送完成,结束 Stream 客户端。
  4. 服务端循环接收客户端发送的数据保存到本地,直到接收完成,结束当前 Stream 服务并退出线程。

代码示例

代码示例如下,也可以直接下载 client server 源码。

/*
* Copyright (c) 2023 ACOAUTO Team.
* All rights reserved.
*
* Detailed license information can be found in the LICENSE file.
*
* File: stream_client_send.c .
*
* Date: 2023-06-28
*
* Author: Yan.chaodong <yanchaodong@acoinfo.com>
*
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#ifdef SYLIXOS
#include <sys/vproc.h>
#endif
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>

#include "vsoa_client.h"
#include "vsoa_cliauto.h"
#include "vsoa_position.h"

/* The file name to send. */
#define SEND_FILE "client.send"

#ifndef MAX_PATH
#define MAX_PATH        260
#endif

/* The recv buffer size. */
#ifndef MAX_RECV_LEN
#define MAX_RECV_LEN    1024
#endif

#define CHAR_BUFFER_LEN 128

static size_t get_file_size (const char* filepath)
{
    size_t  fsize;
    FILE   *fp;

    fp = fopen(filepath, "rb");
    if (fp == NULL) {
        fprintf(stderr, "Cannot open file %s.\n", filepath);
        return  (-1);
    }

    fseek(fp, 0, SEEK_END);
    fsize = ftell(fp);
    fclose(fp);

    return  fsize;
}

/*
 * Do read file and send to stream.
 */
static int do_send_file (const char* filepath, int stream)
{
    int                 read_bytes;
    uint8_t             buffer[MAX_RECV_LEN];
    size_t              sent;
    size_t              total = 0;
    FILE               *fp;
    int                 send_bytes;

    fp = fopen(filepath, "rb");
    if (fp == NULL) {
        fprintf(stderr, "Cannot open file %s, errno is %d.\n", filepath, errno);
        return (-1);
    }

    /* Read the file and send to stream. */
    while (!feof(fp)) {
        read_bytes = fread(buffer, sizeof(char), sizeof(buffer), fp);
        if (read_bytes <= 0) {
            goto    out;
        }

        printf("Read bytes:%d\n", read_bytes);
        sent = 0;

        do {
            send_bytes = send(stream, buffer + sent, read_bytes, 0);
            if (send_bytes < 0) {
                if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
                    fprintf(stderr, "Continue send, retry.\n");
                    continue;
                }
                fprintf(stderr, "Invalid fault, errno is %d.\n", errno);
                goto    out;
            }
            if (send_bytes == 0) {
                fprintf(stderr, "Peer closed.\n");
                goto    out;
            }
            sent += send_bytes;
            printf("Send %lu bytes.\n", sent);
        } while (sent != read_bytes);
        total += sent;
    }

    printf("Send file %s finished, size %lu Bytes.\n", filepath, total);

    return (0);

out:
    if (fp) {
        fclose(fp);
        fp = NULL;
    }
    return (-1);
}

/*
 * VSOA callback function of /write url.
 */
static void write_callback (void *arg, vsoa_client_t *client,
                            vsoa_header_t *vsoa_hdr, vsoa_payload_t *payload)
{
    int                 tunid;
    int                 status;
    struct timespec     timeout;
    int                 stream;
    char                filepath[MAX_PATH];

    /* Check if server response is valid */
    if (!vsoa_hdr) {
        fprintf(stderr, "Server is not responding.\n");
        return;
    }

    status = vsoa_parser_get_status(vsoa_hdr);
    if (status != 0) {
        fprintf(stderr, "Server reply status is %d.\n", status);
        return;
    }

    tunid = vsoa_parser_get_tunid(vsoa_hdr);

    timeout.tv_sec  = 1;
    timeout.tv_nsec = 0;

    /* Create vsoa client stream */
    stream = vsoa_client_stream_create(client, tunid, &timeout, 0);
    if (stream <= 0) {
        fprintf(stderr, "vsoa_client_stream_create: tunid=%d, failed\n", tunid);
        return;
    }

    /* Get the full file path to send */
    getcwd(filepath, sizeof(filepath));
    snprintf(filepath, sizeof(filepath), "%s/%s", filepath, SEND_FILE);
    printf("The full file path to send is %s.\n", filepath);

    if (do_send_file(filepath, stream) < 0) {
        fprintf(stderr, "Send file %s failed.\n", filepath);
    }

    vsoa_client_stream_close(stream);
}

/*
 * VSOA client connection callback.
 */
static void connect_callback (void *arg, vsoa_client_auto_t *cliauto,
                              bool connect, const char *info)
{
    vsoa_client_t       *client;
    vsoa_url_t           url;
    vsoa_payload_t       payload;
    char                *fileinfo;
    char                 filepath[MAX_PATH];

    if (!connect) {
        fprintf(stderr, "Cannot connect to VSOA stream server.\n");
        return;
    }

    printf("Connected to VSOA stream server.\n");

    /* Get client handle. */
    client  = vsoa_client_auto_handle(cliauto);

    getcwd(filepath, sizeof(filepath));
    snprintf(filepath, sizeof(filepath), "%s/%s", filepath, SEND_FILE);
    fileinfo = (char*)malloc(CHAR_BUFFER_LEN);
    if (fileinfo == NULL) {
        fprintf(stderr, "malloc fileinfo failed, errno is %d.\n", errno);
        return;
    }
    snprintf(fileinfo, CHAR_BUFFER_LEN, "{\"name\":\"%s\", \"size\":%lu}", SEND_FILE, get_file_size(filepath));

    /* Set rpc call url to '/write' */
    url.url     = "/write";
    url.url_len = strlen(url.url);
    payload.data = NULL;
    payload.data_len = 0;
    payload.param = fileinfo;
    payload.param_len = strlen(payload.param);
    if (!vsoa_client_call(client, VSOA_CLIENT_RPC_METHOD_GET,
                          &url, &payload, write_callback, NULL, NULL)) {
        fprintf(stderr, "RPC call failed: %s (%d).\n", strerror(errno), errno);
    }

    free(fileinfo);
}

int main(int argc, char *argv[])
{
    vsoa_client_auto_t  *cliauto;

#ifdef SYLIXOS
    vprocExitModeSet(getpid(), LW_VPROC_EXIT_FORCE);
#endif

    /* Create client auto robot */
    cliauto = vsoa_client_auto_create(NULL, NULL);

    /*
     * Add a connection callback
     * The callback is called automatically when client is connected to the server.
     */
    if (!vsoa_client_auto_setup(cliauto, connect_callback, NULL)) {
        vsoa_client_auto_delete(cliauto);
        fprintf(stderr, "Cannot register connect callback: %s (%d).\n", strerror(errno), errno);
        return  (-1);
    }

    /*
     * Client auto robot start
     * The robot will automatically connect to the specified server and maintain the connection.
     * At this time, the developer only needs to focus on the business.
     */
    if (!vsoa_client_auto_start(cliauto, "vsoa://vsoa_stream_server",
                                NULL, NULL, 0, 1000, 1000, 1000)) {
        vsoa_client_auto_delete(cliauto);
        fprintf(stderr, "Client auto start failed: %s (%d).\n", strerror(errno), errno);
        return  (-1);
    }

    while (true) {
        sleep(1);
    }

    return  (0);
}
/*
 * end
 */

/*
* Copyright (c) 2023 ACOAUTO Team.
* All rights reserved.
*
* Detailed license information can be found in the LICENSE file.
*
* File: stream_server_recv.c .
*
* Author: Yan.chaodong <yanchaodong@acoinfo.com>
*
*/


#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_parser.h"
#include "vsoa_server.h"
#include "yyjson.h"

#ifndef MAX_PATH
#define MAX_PATH 260
#endif

/* The recv buffer size. */
#ifndef MAX_RECV_LEN
#define MAX_RECV_LEN 1024
#endif

/* The recv file information. */
struct file_data {
    char     filename[MAX_PATH];
    size_t   size;
    char     md5[64];
};

/* The recv thread context. */
struct file_recv_ctx {
    struct file_data      fileinfo;
    vsoa_server_stream_t *stream;
};

/*
 * Parse json to struct file_data format. 
 */
int file_data_json_parse (struct file_data *file_info, const char *str, size_t len)
{
    yyjson_doc *doc;
    yyjson_val *root, *item;

    doc = yyjson_read(str, len, 0);
    if (!doc) {
        fprintf(stderr, "yyjson_read %s failed.\n", str);
        goto    error;
    }

    root = yyjson_doc_get_root(doc);
    if (!root) {
        printf("yyjson_doc_get_root failed.\n");
        goto    error;
    }

    item = yyjson_obj_get(root, "name");
    if (yyjson_is_str(item)) {
        strcpy(file_info->filename, (char *)yyjson_get_str(item));
    } else {
        goto    error;
    }

    item = yyjson_obj_get(root, "size");
    if (yyjson_is_num(item)) {
        file_info->size = yyjson_get_num(item);
    } else {
        goto    error;
    }

    item = yyjson_obj_get(root, "md5");
    if (yyjson_is_str(item)) {
        strcpy(file_info->md5, (char *)yyjson_get_str(item));
    }

    yyjson_doc_free(doc);
    return  (0);

error:
    yyjson_doc_free(doc);
    return  (-1);
}

/*
 * Do read file and send to stream.
 */
static int do_recv_file (struct file_data *file_info, int stream)
{
    uint8_t             buffer[MAX_RECV_LEN];
    size_t              total;
    FILE               *fp;
    int                 len;
    char                filepath[MAX_PATH] = {0};

    getcwd(filepath, sizeof(filepath));
    snprintf(filepath, sizeof(filepath), "%s/%s", filepath, file_info->filename);

    fp = fopen(filepath, "wb+");
    if (fp == NULL) {
        fprintf(stderr, "Open file %s failed, errno=%d.\n", filepath, errno);
        return  (-1);
    }

    total = file_info->size;
    do {
        len = recv(stream, buffer, sizeof(buffer), 0);
        if (len > 0) {
            printf("Received %d bytes from VSOA stream.\n", len);
            fwrite(buffer, len, 1, fp);
            total -= len;
        } else if (len == 0) {
            printf("Error, peer closed.\n");
            break;
        } else {
            if (errno == EWOULDBLOCK || errno == EAGAIN) {
                printf("Recv continue.\n");
            } else {
                printf("Recv failed, errno is %d.\n", errno);
                fclose(fp);
                return  (-1);
            }
        }
        printf("Total %lu left now.\n", total);
    } while (total);

    printf("Recv finished, recv %lu bytes.\n", file_info->size - total);
    fclose(fp);

    return  (0);
}

/*
 * The thread to read stream and send to stream.
 */
static void *thread_read_stream (void *arg)
{
    struct file_recv_ctx   *file_recv_ctx = (struct file_recv_ctx *)arg;
    struct timeval          timeout;
    fd_set                  fds;
    int                     count;
    int                     client;

    timeout.tv_sec  = 5;
    timeout.tv_usec = 0;

    FD_ZERO(&fds);
    FD_SET(file_recv_ctx->stream->listenfd, &fds);

    count = select(file_recv_ctx->stream->listenfd + 1, &fds, NULL, NULL, &timeout);
    if (count <= 0) {
        fprintf(stderr, "Stream has no client connected.\n");
        goto    out;
    }

    client = vsoa_server_stream_accept(file_recv_ctx->stream, NULL, NULL, 0);
    if (client < 0) {
        goto    out;
    }

    if (do_recv_file(&file_recv_ctx->fileinfo, client) < 0) {
        fprintf(stderr, "Recv file %s failed.\n", file_recv_ctx->fileinfo.filename);
        goto    out;
    }

out:
    if (file_recv_ctx->stream) {
        vsoa_server_stream_close(file_recv_ctx->stream);
    }
    free(file_recv_ctx);

    return NULL;
}

/*
 * The callback function of command write
 */
static void command_write (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
                           vsoa_header_t *vsoa_hdr, vsoa_url_t *url, vsoa_payload_t *payload)
{
    uint32_t                seqno           = vsoa_parser_get_seqno(vsoa_hdr);
    pthread_t               thread;
    vsoa_server_stream_t   *stream;
    struct file_recv_ctx   *file_recv_ctx;

    printf("Receive command write, param is %s.\n", payload->param);

    stream = vsoa_server_stream_create(server);
    if (!stream) {
        fprintf(stderr, "command_write Cannot create server stream.\n");
        vsoa_server_cli_reply(server, cid, VSOA_STATUS_NO_MEMORY, seqno, 0, NULL);
        return;
    }

    file_recv_ctx = (struct file_recv_ctx*)malloc(sizeof(struct file_recv_ctx));
    if (file_recv_ctx == NULL) {
        fprintf(stderr, "Malloc file_recv_ctx failed, errno is %d.\n", errno);
        return;
    }

    if (file_data_json_parse(&file_recv_ctx->fileinfo, payload->param, payload->param_len) < 0) {
        fprintf(stderr, "Parse file info failed. \n");
        free(file_recv_ctx);
        return;
    }

    file_recv_ctx->stream = stream;

    pthread_create(&thread, NULL, thread_read_stream, file_recv_ctx);

    vsoa_server_cli_reply(server, cid, 0, seqno, stream->tunid, NULL);

    printf("Replied client with tunnel ID: %d.\n", stream->tunid);
}

int main (int argc, char *argv[])
{
    vsoa_server_t      *server;
    vsoa_url_t          url_write;
    uint16_t            server_port = 3001;
    struct sockaddr_in  addr;
    int                 ret;

    fd_set              fds;
    int                 max_fd;
    int                 count = 0;
    struct timespec     timeout;

    extern char       **environ;
    char              **env             = environ;
    char               *auto_key        = "VSOA_AUTO_PORT=";
    char               *vsoa_auto_port  = "\0";

    /* Check if 'VSOA_AUTO_PORT' environment variable exists. */
    while (*env) {
        if (memcmp(*env, auto_key, (size_t)strlen(auto_key)) == 0) {
            vsoa_auto_port = *env;
        }
        env++;
    }

    /* get server port from 'VSOA_AUTO_PORT' environment variable. */
    if (strlen(vsoa_auto_port) > 1) {
        const char  s[2]     = "=";
        char       *port_str = "\0";
        char       *token;

        token = strtok(vsoa_auto_port, s);
        while (token != NULL) {
            port_str = token;
            token = strtok(NULL, s);
        }

        server_port = strtol(port_str, NULL, 10);
    }

    /* Set server listen address and port. */
    bzero(&addr, sizeof(struct sockaddr_in));
    addr.sin_family      = AF_INET;
    addr.sin_port        = htons(server_port);
    addr.sin_addr.s_addr = INADDR_ANY;

    server = vsoa_server_create("{\"name\":\"vsoa_stream_server\"}");
    if (!server) {
        printf("Cannot create VSOA stream server.\n");
        return  (-1);
    }

    url_write.url = "/write";
    url_write.url_len = strlen(url_write.url);
    vsoa_server_add_listener(server, &url_write, command_write, NULL);

    ret = vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
    if (!ret) {
        printf("Cannot start VSOA stream server.\n");
        vsoa_server_close(server);
        return  (-1);
    }

    printf("Started VSOA stream server.\n");

    timeout.tv_sec  = 1;
    timeout.tv_nsec = 0;

    while (1) {
        FD_ZERO(&fds);
        max_fd = vsoa_server_fds(server, &fds);

        count = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
        if (count > 0) {
            vsoa_server_input_fds(server, &fds);
        }
    }

    return  (0);
}
/*
 * end
 */

服务端发送文件

下面以服务端发送文件,客户端接收文件为例,介绍 VSOA Stream 发送文件的开发流程和代码示例。服务端发送的文件为执行路径下的 server.send,用户需要自己创建该文件。

开发流程

  1. 客户端首先调用 RPC 接口发送读文件请求。
  2. 服务端收到该 RPC 请求后,创建一个用于发送该文件的 Stream 服务端,将 Stream 服务端的通道号通过 RPC 应答形式返回给客户端,并将要发送的文件信息附加到 payload 中。启动发送线程,结束当前的 RPC 流程。
  3. 客户端接收应答,保存要接受的文件信息,并根据服务端返回的流通道创建 Stream 客户端,开始接收文件。
  4. Stream 服务端收到 Stream 客户端连接后,循环读取本地文件并发送,直到发送完成,结束当前 Stream 服务并退出线程。
  5. 客户端接收完文件后保存文件,关闭 Stream 客户端,结束当前 RPC 应答回调。

代码示例

代码示例如下,也可以直接下载 client server 源码。

/*
* Copyright (c) 2023 ACOAUTO Team.
* All rights reserved.
*
* Detailed license information can be found in the LICENSE file.
*
* File: stream_client_recv.c .
*
* Author: Yan.chaodong <yanchaodong@acoinfo.com>
*
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#ifdef SYLIXOS
#include <sys/vproc.h>
#endif
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>

#include "yyjson.h"
#include "vsoa_client.h"
#include "vsoa_cliauto.h"
#include "vsoa_position.h"

#ifndef MAX_PATH 
#define MAX_PATH 260
#endif

#ifndef MAX_RECV_LEN
#define MAX_RECV_LEN 1024
#endif

/* File information */
struct file_data {
    char    filename[MAX_PATH];
    size_t  size;
    char    md5[64];
};

/*
 * Parse json to struct file_data format.
 */
static int file_data_json_parse (struct file_data *file_info, const char *str, size_t len)
{
    yyjson_doc *doc;
    yyjson_val *root, *item;

    doc = yyjson_read(str, len, 0);
    if (!doc) {
        fprintf(stderr, "yyjson_read %s failed.\n", str);
        goto    error;
    }

    root = yyjson_doc_get_root(doc);
    if (!root) {
        printf("yyjson_doc_get_root failed.\n");
        goto    error;
    }

    item = yyjson_obj_get(root, "name");
    if (yyjson_is_str(item)) {
        strcpy(file_info->filename, (char *)yyjson_get_str(item));
    } else {
        goto    error;
    }

    item = yyjson_obj_get(root, "size");
    if (yyjson_is_num(item)) {
        file_info->size = yyjson_get_num(item);
    } else {
        goto    error;
    }

    item = yyjson_obj_get(root, "md5");
    if (yyjson_is_str(item)) {
        strcpy(file_info->md5, (char *)yyjson_get_str(item));
    }

    yyjson_doc_free(doc);
    return  (0);

error:
    yyjson_doc_free(doc);
    return  (-1);
}

static int do_recv_file (struct file_data *file_info, int stream)
{
    int                 len;
    FILE               *fp;
    size_t              total;
    uint8_t             buffer[MAX_RECV_LEN];
    char                filepath[MAX_PATH] = {0};

    /* Get the full path to recv file. */
    getcwd(filepath, sizeof(filepath));
    snprintf(filepath, sizeof(filepath), "%s/%s", filepath, file_info->filename);
    printf("Recv full filename is %s.\n", filepath);

    fp = fopen(filepath, "wb+");
    if (fp == NULL) {
        fprintf(stderr, "Open file %s failed, errno is %d!\n", filepath, errno);
        return  (-1);
    }

    total = file_info->size;
    do {
        len = recv(stream, buffer, sizeof(buffer), 0);
        if (len > 0) {
            printf("Received %d bytes from VSOA stream.\n", len);
            fwrite(buffer, len, 1, fp);
            total -= len;
        } else if (len == 0){
            printf("Peer closed.\n");
            break;
        } else {
            if (errno == EWOULDBLOCK || errno == EAGAIN) {
                printf("Recv continue.\n");
            } else {
                break;
            }
        }
        printf("Total %lu left now.\n", total);
    } while (total);

    printf("Recv file %s finished, size is %lu.\n", filepath, file_info->size - total);
    
    fclose(fp);

    return  (0);
}

/*
 * VSOA client callback about /read url.
 */
static void read_callback (void *arg, vsoa_client_t *client,
                           vsoa_header_t *vsoa_hdr, vsoa_payload_t *payload)
{
    int                 tunid;
    int                 status;
    struct timespec     timeout;
    int                 stream;
    struct file_data    file_info;

    (void)arg;

    /* Check if server response is valid. */
    if (!vsoa_hdr) {
        fprintf(stderr, "Server is not responding.\n");
        return;
    }

    status = vsoa_parser_get_status(vsoa_hdr);
    if (status != 0) {
        fprintf(stderr, "Server reply status is %d.\n", status);
        return;
    }

    if (!payload || !payload->param || payload->param_len == 0) {
        fprintf(stderr, "Server responds invalid payload.\n");
        return;
    }
    if (file_data_json_parse(&file_info, payload->param, payload->param_len) < 0) {
        fprintf(stderr, "Parse file info failed.\n");
        return;
    }

    timeout.tv_sec  = 1;
    timeout.tv_nsec = 0;
    tunid = vsoa_parser_get_tunid(vsoa_hdr);

    stream = vsoa_client_stream_create(client, tunid, &timeout, 0);
    if (stream <= 0) {
        fprintf(stderr, "vsoa_client_stream_create: tunid=%d, failed.\n", tunid);
        return;
    }

    if (do_recv_file(&file_info, stream) < 0) {
        fprintf(stderr, "do_recv_file failed.\n");
        vsoa_client_stream_close(stream);
        return;
    }

    vsoa_client_stream_close(stream);
}

static void connect_callback (void *arg, vsoa_client_auto_t *cliauto,
                              bool connect, const char *info)
{
    vsoa_client_t       *client;
    vsoa_url_t           read_url;

    if (!connect) {
        fprintf(stderr, "Cannot connect to VSOA stream server.\n");
        return;
    }

    printf("Connected to VSOA stream server.\n");

    client  = vsoa_client_auto_handle(cliauto);

    read_url.url     = "/read";
    read_url.url_len = strlen(read_url.url);

    if (!vsoa_client_call(client, VSOA_CLIENT_RPC_METHOD_GET,
                          &read_url, NULL, read_callback, NULL, NULL)) {
        fprintf(stderr, "RPC call failed: %s (%d).\n", strerror(errno), errno);
        return;
    }
}

int main (int argc, char *argv[])
{
    vsoa_client_auto_t  *cliauto;

#ifdef SYLIXOS
    vprocExitModeSet(getpid(), LW_VPROC_EXIT_FORCE);
#endif

    /* Create client auto robot. */
    cliauto = vsoa_client_auto_create(NULL, NULL);

    /*
     * Add a connection callback
     * The callback is called automatically when client is connected to the server.
     */
    if (!vsoa_client_auto_setup(cliauto, connect_callback, NULL)) {
        vsoa_client_auto_delete(cliauto);
        fprintf(stderr, "Cannot register connect callback: %s (%d).\n", strerror(errno), errno);
        return  (-1);
    }

    /*
     * Client auto robot start
     * The robot will automatically connect to the specified server and maintain the connection.
     * At this time, the developer only needs to focus on the business.
     */
    if (!vsoa_client_auto_start(cliauto, "vsoa://vsoa_stream_server",
                                NULL, NULL, 0, 1000, 1000, 1000)) {
        vsoa_client_auto_delete(cliauto);
        fprintf(stderr, "Client auto start failed: %s (%d).\n", strerror(errno), errno);
        return  (-1);
    }

    while (true) {
        sleep(1);
    }

    return  (0);
}
/*
 * end
 */

/*
* Copyright (c) 2023 ACOAUTO Team.
* All rights reserved.
*
* Detailed license information can be found in the LICENSE file.
*
* File: stream_server_send.c .
*
* Author: Yan.chaodong <yanchaodong@acoinfo.com>
*
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_parser.h"
#include "vsoa_server.h"
#include "yyjson.h"

/* The send file name. */
#define SEND_FILE "server.send"

/* The recv buffer size. */
#ifndef MAX_SEND_LEN
#define MAX_SEND_LEN    1024
#endif

#ifndef MAX_PATH
#define MAX_PATH        260
#endif

#define CHAR_BUFFER_LEN 128

static size_t get_file_size (const char* filepath)
{
    size_t  fsize;
    FILE   *fp;

    fp = fopen(filepath, "rb");
    if (fp == NULL) {
        fprintf(stderr, "Cannot open file %s.\n", filepath);
        return -1;
    }

    fseek(fp, 0, SEEK_END);
    fsize = ftell(fp);
    fclose(fp);

    return  fsize;
}

static int do_send_file(const char* filepath, int stream)
{
    int                 read_bytes;
    ssize_t             sent;
    size_t              total           = 0;
    char                buffer[MAX_SEND_LEN];
    FILE               *fp;
    int                 send_bytes;

    fp = fopen(filepath, "rb");
    if (fp == NULL) {
        fprintf(stderr, "Cannot open file %s.\n", filepath);
        goto    out;
    }

    /* Read the file and send to stream. */
    while (!feof(fp)) {
        read_bytes = fread(buffer, sizeof(char), sizeof(buffer), fp);
        if (read_bytes <= 0) {
            goto    out;
        }

        printf("Read %d bytes.\n", read_bytes);
        sent = 0;

        do {
            send_bytes = send(stream, buffer + sent, read_bytes, 0);
            if (send_bytes < 0) {
                if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
                    fprintf(stderr, "Continue send, retry.\n");
                    continue;
                }
                fprintf(stderr, "Invalid fault, errno is %d.\n", errno);
                goto    out;
            }
            if (send_bytes == 0) {
                fprintf(stderr, "Peer closed.\n");
                goto    out;
            }
            sent += send_bytes;
        } while (sent != read_bytes);
        total += sent;
    }

    printf("Send file %s finished, size %lu Bytes.\n", filepath, total);
    return  (0);

out:
    if (fp) {
        fclose(fp);
        fp = NULL;
    }
    return  (-1);
}

/*
 * The thread function for read file and send to stream.
*/
static void *thread_read_stream (void *arg)
{
    vsoa_server_stream_t   *stream = (vsoa_server_stream_t *)arg;
    struct timeval          timeout;
    fd_set                  fds;
    int                     count;
    int                     client;
    char                    filepath[MAX_PATH];

    /* Set client stream connection timeout to 5s. */
    timeout.tv_sec  = 5;
    timeout.tv_usec = 0;

    FD_ZERO(&fds);
    FD_SET(stream->listenfd, &fds);

    count = select(stream->listenfd + 1, &fds, NULL, NULL, &timeout);
    if (count <= 0) {
        fprintf(stderr, "Stream has no client connected.\n");
        return  (NULL);
    }

    client = vsoa_server_stream_accept(stream, NULL, NULL, 0);
    if (client < 0) {
        return  (NULL);
    }

    getcwd(filepath, sizeof(filepath));
    snprintf(filepath, sizeof(filepath), "%s/%s", filepath, SEND_FILE);

    if (do_send_file(filepath, client) < 0) {
        fprintf(stderr, "Send file failed.\n");
    }

    vsoa_server_stream_close(stream);

    return  (NULL);
}

/*
 * The VSOA callback function to read file and create send thread.
*/
static void command_read(void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
                         vsoa_header_t *vsoa_hdr, vsoa_url_t *url, vsoa_payload_t *payload)
{
    uint32_t                seqno       = vsoa_parser_get_seqno(vsoa_hdr);
    pthread_t               thread;
    vsoa_server_stream_t   *stream;
    vsoa_payload_t          send;
    char                   *fileinfo;
    char                    filepath[MAX_PATH];

    stream = vsoa_server_stream_create(server);
    if (!stream) {
        fprintf(stderr, "Cannot create server stream.\n");
        vsoa_server_cli_reply(server, cid, VSOA_STATUS_NO_MEMORY, seqno, 0, NULL);
        return;
    }
    printf("vsoa_server_stream_create stream=%p.\n", stream);

    fileinfo = (char*)malloc(CHAR_BUFFER_LEN);
    if (fileinfo == NULL) {
        fprintf(stderr, "Malloc fileinfo failed, errno is %d.\n", errno);
        return;
    }
    
    getcwd(filepath, sizeof(filepath));
    snprintf(filepath, sizeof(filepath), "%s/%s", filepath, SEND_FILE);
    snprintf(fileinfo, CHAR_BUFFER_LEN, "{\"name\":\"%s\", \"size\":%lu}", SEND_FILE, get_file_size(filepath));
    
    send.data = NULL;
    send.data_len = 0;
    send.param = fileinfo;
    send.param_len = strlen(send.param);
    /* Make a client reply, include filesize and filename. */
    vsoa_server_cli_reply(server, cid, 0, seqno, stream->tunid, &send);
    
    free(fileinfo);

    /* create dedicated thread to write stream */
    pthread_create(&thread, NULL, thread_read_stream, stream);

    printf("Replied client with tunnel ID: %d.\n", stream->tunid);
}

int main (int argc, char *argv[])
{
    vsoa_server_t      *server;
    vsoa_url_t          url_read;
    uint16_t            server_port = 3001;
    struct sockaddr_in  addr;
    int                 ret;

    fd_set              fds;
    int                 max_fd;
    int                 count = 0;
    struct timespec     timeout;

    extern char       **environ;
    char              **env             = environ;
    char               *auto_key        = "VSOA_AUTO_PORT=";
    char               *vsoa_auto_port  = "\0";

    /* Check if 'VSOA_AUTO_PORT' environment variable exists. */
    while (*env) {
        if (memcmp(*env, auto_key, (size_t)strlen(auto_key)) == 0) {
            vsoa_auto_port = *env;
        }
        env++;
    }

    /* get server port from 'VSOA_AUTO_PORT' environment variable. */
    if (strlen(vsoa_auto_port) > 1) {
        const char  s[2]     = "=";
        char       *port_str = "\0";
        char       *token;

        token = strtok(vsoa_auto_port, s);
        while (token != NULL) {
            port_str = token;
            token = strtok(NULL, s);
        }

        server_port = strtol(port_str, NULL, 10);
    }

    /* Set server listen address and port. */
    bzero(&addr, sizeof(struct sockaddr_in));
    addr.sin_family      = AF_INET;
    addr.sin_port        = htons(server_port);
    addr.sin_addr.s_addr = INADDR_ANY;

    server = vsoa_server_create("{\"name\":\"vsoa_stream_server\"}");
    if (!server) {
        printf("Cannot create VSOA stream server.\n");
        return  (-1);
    }

    url_read.url = "/read";
    url_read.url_len = strlen(url_read.url);
    vsoa_server_add_listener(server, &url_read, command_read, NULL);

    ret = vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
    if (!ret) {
        printf("Cannot start VSOA stream server.\n");
        vsoa_server_close(server);
        return  (-1);
    }

    printf("Started VSOA stream server.\n");

    timeout.tv_sec  = 1;
    timeout.tv_nsec = 0;

    while (1) {
        FD_ZERO(&fds);
        max_fd = vsoa_server_fds(server, &fds);

        count = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
        if (count > 0) {
            vsoa_server_input_fds(server, &fds);
        }
    }

    return  (0);
}
/*
 * end
 */

文档内容是否对您有所帮助?
有帮助
没帮助