开发示例
本节分别以客户端和服务端发送文件为例,介绍如何使用 VSOA Stream 发送文件。
客户端发送文件
下面以客户端发送文件,服务端接收文件为例,介绍 VSOA Stream 发送文件的开发流程和代码示例。客户端发送的文件为执行路径下的 client.send
,用户需要自己创建该文件。
开发流程
- 客户端首先调用 RPC 接口发送写文件请求,将文件名和文件大小附加到 payload 参数发送给服务端。
- 服务端收到该 RPC 请求后,保存文件信息并创建一个用于接收该文件的 Stream 服务端,将 Stream 服务端的通道号通过 RPC 应答形式返回给客户端,启动接收线程,结束当前的 RPC 流程。
- 客户端根据服务端返回的流通道创建 Stream 客户端,并循环读取文件发送给服务端,直到发送完成,结束 Stream 客户端。
- 服务端循环接收客户端发送的数据保存到本地,直到接收完成,结束当前 Stream 服务并退出线程。
代码示例
代码示例如下,也可以直接下载 client 和 server 源码。
/*
* Copyright (c) 2023 ACOAUTO Team.
* All rights reserved.
*
* Detailed license information can be found in the LICENSE file.
*
* File: stream_client_send.c .
*
* Date: 2023-06-28
*
* Author: Yan.chaodong <yanchaodong@acoinfo.com>
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#ifdef SYLIXOS
#include <sys/vproc.h>
#endif
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include "vsoa_client.h"
#include "vsoa_cliauto.h"
#include "vsoa_position.h"
/* The file name to send. */
#define SEND_FILE "client.send"
#ifndef MAX_PATH
#define MAX_PATH 260
#endif
/* The recv buffer size. */
#ifndef MAX_RECV_LEN
#define MAX_RECV_LEN 1024
#endif
#define CHAR_BUFFER_LEN 128
static size_t get_file_size (const char* filepath)
{
size_t fsize;
FILE *fp;
fp = fopen(filepath, "rb");
if (fp == NULL) {
fprintf(stderr, "Cannot open file %s.\n", filepath);
return (-1);
}
fseek(fp, 0, SEEK_END);
fsize = ftell(fp);
fclose(fp);
return fsize;
}
/*
* Do read file and send to stream.
*/
static int do_send_file (const char* filepath, int stream)
{
int read_bytes;
uint8_t buffer[MAX_RECV_LEN];
size_t sent;
size_t total = 0;
FILE *fp;
int send_bytes;
fp = fopen(filepath, "rb");
if (fp == NULL) {
fprintf(stderr, "Cannot open file %s, errno is %d.\n", filepath, errno);
return (-1);
}
/* Read the file and send to stream. */
while (!feof(fp)) {
read_bytes = fread(buffer, sizeof(char), sizeof(buffer), fp);
if (read_bytes <= 0) {
goto out;
}
printf("Read bytes:%d\n", read_bytes);
sent = 0;
do {
send_bytes = send(stream, buffer + sent, read_bytes, 0);
if (send_bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
fprintf(stderr, "Continue send, retry.\n");
continue;
}
fprintf(stderr, "Invalid fault, errno is %d.\n", errno);
goto out;
}
if (send_bytes == 0) {
fprintf(stderr, "Peer closed.\n");
goto out;
}
sent += send_bytes;
printf("Send %lu bytes.\n", sent);
} while (sent != read_bytes);
total += sent;
}
printf("Send file %s finished, size %lu Bytes.\n", filepath, total);
return (0);
out:
if (fp) {
fclose(fp);
fp = NULL;
}
return (-1);
}
/*
* VSOA callback function of /write url.
*/
static void write_callback (void *arg, vsoa_client_t *client,
vsoa_header_t *vsoa_hdr, vsoa_payload_t *payload)
{
int tunid;
int status;
struct timespec timeout;
int stream;
char filepath[MAX_PATH];
/* Check if server response is valid */
if (!vsoa_hdr) {
fprintf(stderr, "Server is not responding.\n");
return;
}
status = vsoa_parser_get_status(vsoa_hdr);
if (status != 0) {
fprintf(stderr, "Server reply status is %d.\n", status);
return;
}
tunid = vsoa_parser_get_tunid(vsoa_hdr);
timeout.tv_sec = 1;
timeout.tv_nsec = 0;
/* Create vsoa client stream */
stream = vsoa_client_stream_create(client, tunid, &timeout, 0);
if (stream <= 0) {
fprintf(stderr, "vsoa_client_stream_create: tunid=%d, failed\n", tunid);
return;
}
/* Get the full file path to send */
getcwd(filepath, sizeof(filepath));
snprintf(filepath, sizeof(filepath), "%s/%s", filepath, SEND_FILE);
printf("The full file path to send is %s.\n", filepath);
if (do_send_file(filepath, stream) < 0) {
fprintf(stderr, "Send file %s failed.\n", filepath);
}
vsoa_client_stream_close(stream);
}
/*
* VSOA client connection callback.
*/
static void connect_callback (void *arg, vsoa_client_auto_t *cliauto,
bool connect, const char *info)
{
vsoa_client_t *client;
vsoa_url_t url;
vsoa_payload_t payload;
char *fileinfo;
char filepath[MAX_PATH];
if (!connect) {
fprintf(stderr, "Cannot connect to VSOA stream server.\n");
return;
}
printf("Connected to VSOA stream server.\n");
/* Get client handle. */
client = vsoa_client_auto_handle(cliauto);
getcwd(filepath, sizeof(filepath));
snprintf(filepath, sizeof(filepath), "%s/%s", filepath, SEND_FILE);
fileinfo = (char*)malloc(CHAR_BUFFER_LEN);
if (fileinfo == NULL) {
fprintf(stderr, "malloc fileinfo failed, errno is %d.\n", errno);
return;
}
snprintf(fileinfo, CHAR_BUFFER_LEN, "{\"name\":\"%s\", \"size\":%lu}", SEND_FILE, get_file_size(filepath));
/* Set rpc call url to '/write' */
url.url = "/write";
url.url_len = strlen(url.url);
payload.data = NULL;
payload.data_len = 0;
payload.param = fileinfo;
payload.param_len = strlen(payload.param);
if (!vsoa_client_call(client, VSOA_CLIENT_RPC_METHOD_GET,
&url, &payload, write_callback, NULL, NULL)) {
fprintf(stderr, "RPC call failed: %s (%d).\n", strerror(errno), errno);
}
free(fileinfo);
}
int main(int argc, char *argv[])
{
vsoa_client_auto_t *cliauto;
#ifdef SYLIXOS
vprocExitModeSet(getpid(), LW_VPROC_EXIT_FORCE);
#endif
/* Create client auto robot */
cliauto = vsoa_client_auto_create(NULL, NULL);
/*
* Add a connection callback
* The callback is called automatically when client is connected to the server.
*/
if (!vsoa_client_auto_setup(cliauto, connect_callback, NULL)) {
vsoa_client_auto_delete(cliauto);
fprintf(stderr, "Cannot register connect callback: %s (%d).\n", strerror(errno), errno);
return (-1);
}
/*
* Client auto robot start
* The robot will automatically connect to the specified server and maintain the connection.
* At this time, the developer only needs to focus on the business.
*/
if (!vsoa_client_auto_start(cliauto, "vsoa://vsoa_stream_server",
NULL, NULL, 0, 1000, 1000, 1000)) {
vsoa_client_auto_delete(cliauto);
fprintf(stderr, "Client auto start failed: %s (%d).\n", strerror(errno), errno);
return (-1);
}
while (true) {
sleep(1);
}
return (0);
}
/*
* end
*/
/*
* Copyright (c) 2023 ACOAUTO Team.
* All rights reserved.
*
* Detailed license information can be found in the LICENSE file.
*
* File: stream_server_recv.c .
*
* Author: Yan.chaodong <yanchaodong@acoinfo.com>
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_parser.h"
#include "vsoa_server.h"
#include "yyjson.h"
#ifndef MAX_PATH
#define MAX_PATH 260
#endif
/* The recv buffer size. */
#ifndef MAX_RECV_LEN
#define MAX_RECV_LEN 1024
#endif
/* The recv file information. */
struct file_data {
char filename[MAX_PATH];
size_t size;
char md5[64];
};
/* The recv thread context. */
struct file_recv_ctx {
struct file_data fileinfo;
vsoa_server_stream_t *stream;
};
/*
* Parse json to struct file_data format.
*/
int file_data_json_parse (struct file_data *file_info, const char *str, size_t len)
{
yyjson_doc *doc;
yyjson_val *root, *item;
doc = yyjson_read(str, len, 0);
if (!doc) {
fprintf(stderr, "yyjson_read %s failed.\n", str);
goto error;
}
root = yyjson_doc_get_root(doc);
if (!root) {
printf("yyjson_doc_get_root failed.\n");
goto error;
}
item = yyjson_obj_get(root, "name");
if (yyjson_is_str(item)) {
strcpy(file_info->filename, (char *)yyjson_get_str(item));
} else {
goto error;
}
item = yyjson_obj_get(root, "size");
if (yyjson_is_num(item)) {
file_info->size = yyjson_get_num(item);
} else {
goto error;
}
item = yyjson_obj_get(root, "md5");
if (yyjson_is_str(item)) {
strcpy(file_info->md5, (char *)yyjson_get_str(item));
}
yyjson_doc_free(doc);
return (0);
error:
yyjson_doc_free(doc);
return (-1);
}
/*
* Do read file and send to stream.
*/
static int do_recv_file (struct file_data *file_info, int stream)
{
uint8_t buffer[MAX_RECV_LEN];
size_t total;
FILE *fp;
int len;
char filepath[MAX_PATH] = {0};
getcwd(filepath, sizeof(filepath));
snprintf(filepath, sizeof(filepath), "%s/%s", filepath, file_info->filename);
fp = fopen(filepath, "wb+");
if (fp == NULL) {
fprintf(stderr, "Open file %s failed, errno=%d.\n", filepath, errno);
return (-1);
}
total = file_info->size;
do {
len = recv(stream, buffer, sizeof(buffer), 0);
if (len > 0) {
printf("Received %d bytes from VSOA stream.\n", len);
fwrite(buffer, len, 1, fp);
total -= len;
} else if (len == 0) {
printf("Error, peer closed.\n");
break;
} else {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
printf("Recv continue.\n");
} else {
printf("Recv failed, errno is %d.\n", errno);
fclose(fp);
return (-1);
}
}
printf("Total %lu left now.\n", total);
} while (total);
printf("Recv finished, recv %lu bytes.\n", file_info->size - total);
fclose(fp);
return (0);
}
/*
* The thread to read stream and send to stream.
*/
static void *thread_read_stream (void *arg)
{
struct file_recv_ctx *file_recv_ctx = (struct file_recv_ctx *)arg;
struct timeval timeout;
fd_set fds;
int count;
int client;
timeout.tv_sec = 5;
timeout.tv_usec = 0;
FD_ZERO(&fds);
FD_SET(file_recv_ctx->stream->listenfd, &fds);
count = select(file_recv_ctx->stream->listenfd + 1, &fds, NULL, NULL, &timeout);
if (count <= 0) {
fprintf(stderr, "Stream has no client connected.\n");
goto out;
}
client = vsoa_server_stream_accept(file_recv_ctx->stream, NULL, NULL, 0);
if (client < 0) {
goto out;
}
if (do_recv_file(&file_recv_ctx->fileinfo, client) < 0) {
fprintf(stderr, "Recv file %s failed.\n", file_recv_ctx->fileinfo.filename);
goto out;
}
out:
if (file_recv_ctx->stream) {
vsoa_server_stream_close(file_recv_ctx->stream);
}
free(file_recv_ctx);
return NULL;
}
/*
* The callback function of command write
*/
static void command_write (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
vsoa_header_t *vsoa_hdr, vsoa_url_t *url, vsoa_payload_t *payload)
{
uint32_t seqno = vsoa_parser_get_seqno(vsoa_hdr);
pthread_t thread;
vsoa_server_stream_t *stream;
struct file_recv_ctx *file_recv_ctx;
printf("Receive command write, param is %s.\n", payload->param);
stream = vsoa_server_stream_create(server);
if (!stream) {
fprintf(stderr, "command_write Cannot create server stream.\n");
vsoa_server_cli_reply(server, cid, VSOA_STATUS_NO_MEMORY, seqno, 0, NULL);
return;
}
file_recv_ctx = (struct file_recv_ctx*)malloc(sizeof(struct file_recv_ctx));
if (file_recv_ctx == NULL) {
fprintf(stderr, "Malloc file_recv_ctx failed, errno is %d.\n", errno);
return;
}
if (file_data_json_parse(&file_recv_ctx->fileinfo, payload->param, payload->param_len) < 0) {
fprintf(stderr, "Parse file info failed. \n");
free(file_recv_ctx);
return;
}
file_recv_ctx->stream = stream;
pthread_create(&thread, NULL, thread_read_stream, file_recv_ctx);
vsoa_server_cli_reply(server, cid, 0, seqno, stream->tunid, NULL);
printf("Replied client with tunnel ID: %d.\n", stream->tunid);
}
int main (int argc, char *argv[])
{
vsoa_server_t *server;
vsoa_url_t url_write;
uint16_t server_port = 3001;
struct sockaddr_in addr;
int ret;
fd_set fds;
int max_fd;
int count = 0;
struct timespec timeout;
extern char **environ;
char **env = environ;
char *auto_key = "VSOA_AUTO_PORT=";
char *vsoa_auto_port = "\0";
/* Check if 'VSOA_AUTO_PORT' environment variable exists. */
while (*env) {
if (memcmp(*env, auto_key, (size_t)strlen(auto_key)) == 0) {
vsoa_auto_port = *env;
}
env++;
}
/* get server port from 'VSOA_AUTO_PORT' environment variable. */
if (strlen(vsoa_auto_port) > 1) {
const char s[2] = "=";
char *port_str = "\0";
char *token;
token = strtok(vsoa_auto_port, s);
while (token != NULL) {
port_str = token;
token = strtok(NULL, s);
}
server_port = strtol(port_str, NULL, 10);
}
/* Set server listen address and port. */
bzero(&addr, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(server_port);
addr.sin_addr.s_addr = INADDR_ANY;
server = vsoa_server_create("{\"name\":\"vsoa_stream_server\"}");
if (!server) {
printf("Cannot create VSOA stream server.\n");
return (-1);
}
url_write.url = "/write";
url_write.url_len = strlen(url_write.url);
vsoa_server_add_listener(server, &url_write, command_write, NULL);
ret = vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
if (!ret) {
printf("Cannot start VSOA stream server.\n");
vsoa_server_close(server);
return (-1);
}
printf("Started VSOA stream server.\n");
timeout.tv_sec = 1;
timeout.tv_nsec = 0;
while (1) {
FD_ZERO(&fds);
max_fd = vsoa_server_fds(server, &fds);
count = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
if (count > 0) {
vsoa_server_input_fds(server, &fds);
}
}
return (0);
}
/*
* end
*/
服务端发送文件
下面以服务端发送文件,客户端接收文件为例,介绍 VSOA Stream 发送文件的开发流程和代码示例。服务端发送的文件为执行路径下的 server.send
,用户需要自己创建该文件。
开发流程
- 客户端首先调用 RPC 接口发送读文件请求。
- 服务端收到该 RPC 请求后,创建一个用于发送该文件的 Stream 服务端,将 Stream 服务端的通道号通过 RPC 应答形式返回给客户端,并将要发送的文件信息附加到 payload 中。启动发送线程,结束当前的 RPC 流程。
- 客户端接收应答,保存要接受的文件信息,并根据服务端返回的流通道创建 Stream 客户端,开始接收文件。
- Stream 服务端收到 Stream 客户端连接后,循环读取本地文件并发送,直到发送完成,结束当前 Stream 服务并退出线程。
- 客户端接收完文件后保存文件,关闭 Stream 客户端,结束当前 RPC 应答回调。
代码示例
代码示例如下,也可以直接下载 client 和 server 源码。
/*
* Copyright (c) 2023 ACOAUTO Team.
* All rights reserved.
*
* Detailed license information can be found in the LICENSE file.
*
* File: stream_client_recv.c .
*
* Author: Yan.chaodong <yanchaodong@acoinfo.com>
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#ifdef SYLIXOS
#include <sys/vproc.h>
#endif
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include "yyjson.h"
#include "vsoa_client.h"
#include "vsoa_cliauto.h"
#include "vsoa_position.h"
#ifndef MAX_PATH
#define MAX_PATH 260
#endif
#ifndef MAX_RECV_LEN
#define MAX_RECV_LEN 1024
#endif
/* File information */
struct file_data {
char filename[MAX_PATH];
size_t size;
char md5[64];
};
/*
* Parse json to struct file_data format.
*/
static int file_data_json_parse (struct file_data *file_info, const char *str, size_t len)
{
yyjson_doc *doc;
yyjson_val *root, *item;
doc = yyjson_read(str, len, 0);
if (!doc) {
fprintf(stderr, "yyjson_read %s failed.\n", str);
goto error;
}
root = yyjson_doc_get_root(doc);
if (!root) {
printf("yyjson_doc_get_root failed.\n");
goto error;
}
item = yyjson_obj_get(root, "name");
if (yyjson_is_str(item)) {
strcpy(file_info->filename, (char *)yyjson_get_str(item));
} else {
goto error;
}
item = yyjson_obj_get(root, "size");
if (yyjson_is_num(item)) {
file_info->size = yyjson_get_num(item);
} else {
goto error;
}
item = yyjson_obj_get(root, "md5");
if (yyjson_is_str(item)) {
strcpy(file_info->md5, (char *)yyjson_get_str(item));
}
yyjson_doc_free(doc);
return (0);
error:
yyjson_doc_free(doc);
return (-1);
}
static int do_recv_file (struct file_data *file_info, int stream)
{
int len;
FILE *fp;
size_t total;
uint8_t buffer[MAX_RECV_LEN];
char filepath[MAX_PATH] = {0};
/* Get the full path to recv file. */
getcwd(filepath, sizeof(filepath));
snprintf(filepath, sizeof(filepath), "%s/%s", filepath, file_info->filename);
printf("Recv full filename is %s.\n", filepath);
fp = fopen(filepath, "wb+");
if (fp == NULL) {
fprintf(stderr, "Open file %s failed, errno is %d!\n", filepath, errno);
return (-1);
}
total = file_info->size;
do {
len = recv(stream, buffer, sizeof(buffer), 0);
if (len > 0) {
printf("Received %d bytes from VSOA stream.\n", len);
fwrite(buffer, len, 1, fp);
total -= len;
} else if (len == 0){
printf("Peer closed.\n");
break;
} else {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
printf("Recv continue.\n");
} else {
break;
}
}
printf("Total %lu left now.\n", total);
} while (total);
printf("Recv file %s finished, size is %lu.\n", filepath, file_info->size - total);
fclose(fp);
return (0);
}
/*
* VSOA client callback about /read url.
*/
static void read_callback (void *arg, vsoa_client_t *client,
vsoa_header_t *vsoa_hdr, vsoa_payload_t *payload)
{
int tunid;
int status;
struct timespec timeout;
int stream;
struct file_data file_info;
(void)arg;
/* Check if server response is valid. */
if (!vsoa_hdr) {
fprintf(stderr, "Server is not responding.\n");
return;
}
status = vsoa_parser_get_status(vsoa_hdr);
if (status != 0) {
fprintf(stderr, "Server reply status is %d.\n", status);
return;
}
if (!payload || !payload->param || payload->param_len == 0) {
fprintf(stderr, "Server responds invalid payload.\n");
return;
}
if (file_data_json_parse(&file_info, payload->param, payload->param_len) < 0) {
fprintf(stderr, "Parse file info failed.\n");
return;
}
timeout.tv_sec = 1;
timeout.tv_nsec = 0;
tunid = vsoa_parser_get_tunid(vsoa_hdr);
stream = vsoa_client_stream_create(client, tunid, &timeout, 0);
if (stream <= 0) {
fprintf(stderr, "vsoa_client_stream_create: tunid=%d, failed.\n", tunid);
return;
}
if (do_recv_file(&file_info, stream) < 0) {
fprintf(stderr, "do_recv_file failed.\n");
vsoa_client_stream_close(stream);
return;
}
vsoa_client_stream_close(stream);
}
static void connect_callback (void *arg, vsoa_client_auto_t *cliauto,
bool connect, const char *info)
{
vsoa_client_t *client;
vsoa_url_t read_url;
if (!connect) {
fprintf(stderr, "Cannot connect to VSOA stream server.\n");
return;
}
printf("Connected to VSOA stream server.\n");
client = vsoa_client_auto_handle(cliauto);
read_url.url = "/read";
read_url.url_len = strlen(read_url.url);
if (!vsoa_client_call(client, VSOA_CLIENT_RPC_METHOD_GET,
&read_url, NULL, read_callback, NULL, NULL)) {
fprintf(stderr, "RPC call failed: %s (%d).\n", strerror(errno), errno);
return;
}
}
int main (int argc, char *argv[])
{
vsoa_client_auto_t *cliauto;
#ifdef SYLIXOS
vprocExitModeSet(getpid(), LW_VPROC_EXIT_FORCE);
#endif
/* Create client auto robot. */
cliauto = vsoa_client_auto_create(NULL, NULL);
/*
* Add a connection callback
* The callback is called automatically when client is connected to the server.
*/
if (!vsoa_client_auto_setup(cliauto, connect_callback, NULL)) {
vsoa_client_auto_delete(cliauto);
fprintf(stderr, "Cannot register connect callback: %s (%d).\n", strerror(errno), errno);
return (-1);
}
/*
* Client auto robot start
* The robot will automatically connect to the specified server and maintain the connection.
* At this time, the developer only needs to focus on the business.
*/
if (!vsoa_client_auto_start(cliauto, "vsoa://vsoa_stream_server",
NULL, NULL, 0, 1000, 1000, 1000)) {
vsoa_client_auto_delete(cliauto);
fprintf(stderr, "Client auto start failed: %s (%d).\n", strerror(errno), errno);
return (-1);
}
while (true) {
sleep(1);
}
return (0);
}
/*
* end
*/
/*
* Copyright (c) 2023 ACOAUTO Team.
* All rights reserved.
*
* Detailed license information can be found in the LICENSE file.
*
* File: stream_server_send.c .
*
* Author: Yan.chaodong <yanchaodong@acoinfo.com>
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_parser.h"
#include "vsoa_server.h"
#include "yyjson.h"
/* The send file name. */
#define SEND_FILE "server.send"
/* The recv buffer size. */
#ifndef MAX_SEND_LEN
#define MAX_SEND_LEN 1024
#endif
#ifndef MAX_PATH
#define MAX_PATH 260
#endif
#define CHAR_BUFFER_LEN 128
static size_t get_file_size (const char* filepath)
{
size_t fsize;
FILE *fp;
fp = fopen(filepath, "rb");
if (fp == NULL) {
fprintf(stderr, "Cannot open file %s.\n", filepath);
return -1;
}
fseek(fp, 0, SEEK_END);
fsize = ftell(fp);
fclose(fp);
return fsize;
}
static int do_send_file(const char* filepath, int stream)
{
int read_bytes;
ssize_t sent;
size_t total = 0;
char buffer[MAX_SEND_LEN];
FILE *fp;
int send_bytes;
fp = fopen(filepath, "rb");
if (fp == NULL) {
fprintf(stderr, "Cannot open file %s.\n", filepath);
goto out;
}
/* Read the file and send to stream. */
while (!feof(fp)) {
read_bytes = fread(buffer, sizeof(char), sizeof(buffer), fp);
if (read_bytes <= 0) {
goto out;
}
printf("Read %d bytes.\n", read_bytes);
sent = 0;
do {
send_bytes = send(stream, buffer + sent, read_bytes, 0);
if (send_bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
fprintf(stderr, "Continue send, retry.\n");
continue;
}
fprintf(stderr, "Invalid fault, errno is %d.\n", errno);
goto out;
}
if (send_bytes == 0) {
fprintf(stderr, "Peer closed.\n");
goto out;
}
sent += send_bytes;
} while (sent != read_bytes);
total += sent;
}
printf("Send file %s finished, size %lu Bytes.\n", filepath, total);
return (0);
out:
if (fp) {
fclose(fp);
fp = NULL;
}
return (-1);
}
/*
* The thread function for read file and send to stream.
*/
static void *thread_read_stream (void *arg)
{
vsoa_server_stream_t *stream = (vsoa_server_stream_t *)arg;
struct timeval timeout;
fd_set fds;
int count;
int client;
char filepath[MAX_PATH];
/* Set client stream connection timeout to 5s. */
timeout.tv_sec = 5;
timeout.tv_usec = 0;
FD_ZERO(&fds);
FD_SET(stream->listenfd, &fds);
count = select(stream->listenfd + 1, &fds, NULL, NULL, &timeout);
if (count <= 0) {
fprintf(stderr, "Stream has no client connected.\n");
return (NULL);
}
client = vsoa_server_stream_accept(stream, NULL, NULL, 0);
if (client < 0) {
return (NULL);
}
getcwd(filepath, sizeof(filepath));
snprintf(filepath, sizeof(filepath), "%s/%s", filepath, SEND_FILE);
if (do_send_file(filepath, client) < 0) {
fprintf(stderr, "Send file failed.\n");
}
vsoa_server_stream_close(stream);
return (NULL);
}
/*
* The VSOA callback function to read file and create send thread.
*/
static void command_read(void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
vsoa_header_t *vsoa_hdr, vsoa_url_t *url, vsoa_payload_t *payload)
{
uint32_t seqno = vsoa_parser_get_seqno(vsoa_hdr);
pthread_t thread;
vsoa_server_stream_t *stream;
vsoa_payload_t send;
char *fileinfo;
char filepath[MAX_PATH];
stream = vsoa_server_stream_create(server);
if (!stream) {
fprintf(stderr, "Cannot create server stream.\n");
vsoa_server_cli_reply(server, cid, VSOA_STATUS_NO_MEMORY, seqno, 0, NULL);
return;
}
printf("vsoa_server_stream_create stream=%p.\n", stream);
fileinfo = (char*)malloc(CHAR_BUFFER_LEN);
if (fileinfo == NULL) {
fprintf(stderr, "Malloc fileinfo failed, errno is %d.\n", errno);
return;
}
getcwd(filepath, sizeof(filepath));
snprintf(filepath, sizeof(filepath), "%s/%s", filepath, SEND_FILE);
snprintf(fileinfo, CHAR_BUFFER_LEN, "{\"name\":\"%s\", \"size\":%lu}", SEND_FILE, get_file_size(filepath));
send.data = NULL;
send.data_len = 0;
send.param = fileinfo;
send.param_len = strlen(send.param);
/* Make a client reply, include filesize and filename. */
vsoa_server_cli_reply(server, cid, 0, seqno, stream->tunid, &send);
free(fileinfo);
/* create dedicated thread to write stream */
pthread_create(&thread, NULL, thread_read_stream, stream);
printf("Replied client with tunnel ID: %d.\n", stream->tunid);
}
int main (int argc, char *argv[])
{
vsoa_server_t *server;
vsoa_url_t url_read;
uint16_t server_port = 3001;
struct sockaddr_in addr;
int ret;
fd_set fds;
int max_fd;
int count = 0;
struct timespec timeout;
extern char **environ;
char **env = environ;
char *auto_key = "VSOA_AUTO_PORT=";
char *vsoa_auto_port = "\0";
/* Check if 'VSOA_AUTO_PORT' environment variable exists. */
while (*env) {
if (memcmp(*env, auto_key, (size_t)strlen(auto_key)) == 0) {
vsoa_auto_port = *env;
}
env++;
}
/* get server port from 'VSOA_AUTO_PORT' environment variable. */
if (strlen(vsoa_auto_port) > 1) {
const char s[2] = "=";
char *port_str = "\0";
char *token;
token = strtok(vsoa_auto_port, s);
while (token != NULL) {
port_str = token;
token = strtok(NULL, s);
}
server_port = strtol(port_str, NULL, 10);
}
/* Set server listen address and port. */
bzero(&addr, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(server_port);
addr.sin_addr.s_addr = INADDR_ANY;
server = vsoa_server_create("{\"name\":\"vsoa_stream_server\"}");
if (!server) {
printf("Cannot create VSOA stream server.\n");
return (-1);
}
url_read.url = "/read";
url_read.url_len = strlen(url_read.url);
vsoa_server_add_listener(server, &url_read, command_read, NULL);
ret = vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
if (!ret) {
printf("Cannot start VSOA stream server.\n");
vsoa_server_close(server);
return (-1);
}
printf("Started VSOA stream server.\n");
timeout.tv_sec = 1;
timeout.tv_nsec = 0;
while (1) {
FD_ZERO(&fds);
max_fd = vsoa_server_fds(server, &fds);
count = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
if (count > 0) {
vsoa_server_input_fds(server, &fds);
}
}
return (0);
}
/*
* end
*/