1 Star 0 Fork 23

viyjk99 / TaskMsgBus

forked from slyant / TaskMsgBus 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

TaskMsgBus

1、介绍

这个软件包实现了基于RT-Thread的消息总线,可以轻松的实现线程间的同步和消息收发,支持文本、数字、结构体等任意复杂的消息类型的发送和接收。当有多个线程订阅消费消息时,不会增加内存的使用,通过设置消息释放的钩子函数,实现内存的自动回收。

1.1 目录结构

名称 说明
examples 示例
inc 头文件目录
src 源代码目录

1.2 许可证

TaskMsgBus package 遵循 Apache license v2.0 许可,详见 LICENSE 文件。

1.3 依赖

无。当开启Json消息类型支持时,会自动选中cJSON软件包。

2、如何打开 TaskMsgBus package

使用 TaskMsgBus package 需要在 RT-Thread 的包管理器中选择它,具体路径如下:

RT-Thread online packages
    system packages --->
        [*]TaskMsgBus: For sending and receiving json/text/object messages between threads based on RT-Thread
        TaskMsgBus --->
            [*]task msg name define in user file 'task_msg_user_def.h'
            [*]task msg format using json
            [*]task msg object using dynamic memory
            [*]Enable TaskMsgBus Sample

3、使用 TaskMsgBus package

按上述方法打开TaskMsgBus package,启用示例,编译工程,即可在控制台看到示例运行结果: avatar

3.1 API 列表

API 功能
rt_err_t task_msg_bus_init(rt_uint32_t stack_size, rt_uint8_t priority, rt_uint32_t tick); 初始化消息总线
rt_err_t task_msg_subscribe(enum task_msg_name msg_name, void(*callback)(task_msg_args_t msg_args)); 订阅消息
rt_err_t task_msg_unsubscribe(enum task_msg_name msg_name, void(*callback)(task_msg_args_t msg_args)); 取消订阅消息
rt_err_t task_msg_publish(enum task_msg_name msg_name, const char *msg_text); 发布text/json消息
rt_err_t task_msg_publish_obj(enum task_msg_name msg_name, void *msg_obj, rt_size_t msg_size); 发布任意数据类型消息
int task_msg_subscriber_create(enum task_msg_name msg_name); 创建一个消息订阅者,返回订阅者ID
int task_msg_subscriber_create2(const enum task_msg_name *msg_name_list, rt_uint8_t msg_name_list_len); 创建一个可以订阅多个主题的消息订阅者,返回订阅者ID
rt_err_t task_msg_wait_until(int subscriber_id, rt_int32_t timeout_ms, struct task_msg_args **out_args); 阻塞等待指定订阅者的消息
void task_msg_release(task_msg_args_t args); 释放已经消费的消息
void task_msg_subscriber_delete(int subscriber_id); 删除一个消息订阅者

3.2 使用方法

  • 在包管理器中取消Enable TaskMsgBus Sample选项
  • 参照示例文件夹中的“task_msg_bus_user_def.h”,创建头文件“task_msg_bus_user_def.h”

定义消息名称的枚举类型enum task_msg_name,例如:

enum task_msg_name{
    TASK_MSG_OS_REDAY = 0,  //RT_NULL
    TASK_MSG_NET_REDAY,     //json: net_reday:int,ip:string,id:int
    TASK_MSG_1,
    TASK_MSG_2,             //struct msg_2_def
    TASK_MSG_3,             //struct msg_3_def
    TASK_MSG_4,
    TASK_MSG_5,
    TASK_MSG_COUNT
};

如果要使用结构体类型的消息,需要定义消息结构体,例如:

struct msg_2_def
{
    int id;
    char name[8];
};

struct msg_3_def
{
    int id;
    char name[8];
    rt_uint8_t *buffer;
    rt_size_t buffer_size;
};

如果要在结构体的指针类型的字段中动态分配内存,需要在前面的包管理器中启用[task msg object using dynamic memory],同时,需要定义释放该消息涉及动态分配内存的钩子函数,例如:

extern void msg_3_release_hook(void *args);
#define task_msg_release_hooks {\
        {TASK_MSG_OS_REDAY, RT_NULL},   \
        {TASK_MSG_NET_REDAY, RT_NULL},  \
        {TASK_MSG_1, RT_NULL},          \
        {TASK_MSG_2, RT_NULL},          \
        {TASK_MSG_3, msg_3_release_hook},          \
        {TASK_MSG_4, RT_NULL},          \
        {TASK_MSG_5, RT_NULL},          \
    }

在用户的 *.c 文件中实现此钩子函数,例如:

void msg_3_release_hook(void *args)
{
    struct msg_3_def *msg_3 = (struct msg_3_def *)args;
    if(msg_3->buffer)
        rt_free(msg_3->buffer);
}
  • 初始化
task_msg_bus_init(512, 11, 10); //初始化消息总线(线程栈大小, 优先级, 时间片)

调用此函数将动态创建1个消息总线的消息分发线程

  • 订阅消息(在回调函数中处理不耗资源的任务时,可以使用此方式)
static void net_reday_callback(task_msg_args_t args)
{
    LOG_D("[net_reday_callback]:TASK_MSG_NET_REDAY => args->msg_name:%d, args->msg_obj:%s", args->msg_name, args->msg_obj);
}

task_msg_subscribe(TASK_MSG_NET_REDAY, net_reday_callback);
  • 取消订阅消息
task_msg_unsubscribe(TASK_MSG_NET_REDAY, net_reday_callback);
  • 发布消息

不带消息内容:

task_msg_publish(TASK_MSG_OS_REDAY, RT_NULL);

text/json消息:

char msg_text[50];
rt_snprintf(msg_text, 50, "{\"net_reday\":%d,\"ip\":\"%s\",\"id\":%ld}", 1, "10.0.0.20", i);
            task_msg_publish(TASK_MSG_NET_REDAY, msg_text);

结构体类型的消息(内部字段无动态内存分配):

struct msg_2_def msg_2;
msg_2.id = i;
rt_snprintf(msg_2.name, 8, "%s\0", "hello");
task_msg_publish_obj(TASK_MSG_2, &msg_2, sizeof(struct msg_2_def));

结构体类型的消息(内部字段有动态内存分配):

const char buffer_test[32] = {
    0x0F, 0x51, 0xEE, 0x89, 0x9D, 0x40, 0x80, 0x22, 0x63, 0x44, 0x43, 0x39, 0x55, 0x2D, 0x12, 0xA1,
    0x1C, 0x91, 0xE5, 0x2C, 0xC4, 0x6A, 0x62, 0x5B, 0xB6, 0x41, 0xF0, 0xF7, 0x75, 0x48, 0x05, 0xE9
};

struct msg_3_def msg_3;
msg_3.id = i;
rt_snprintf(msg_3.name, 8, "%s\0", "slyant");
msg_3.buffer = rt_calloc(1, 32);
rt_memcpy(msg_3.buffer, buffer_test, 32);
msg_3.buffer_size = 32;
task_msg_publish_obj(TASK_MSG_3, &msg_3, sizeof(struct msg_3_def));
  • 以线程阻塞的方式接收消息

接收某个指定的消息:

static void msg_wait_thread_entry(void *params)
{
    rt_err_t rst;
    task_msg_args_t args;
    //创建消息订阅者
    int subscriber_id = task_msg_subscriber_create(TASK_MSG_NET_REDAY);
    if(subscriber_id < 0) return;

    while(1)
    {
        rst = task_msg_wait_until(subscriber_id, 50, &args);
        if(rst==RT_EOK)
        {
            LOG_D("[task_msg_wait_until]:TASK_MSG_NET_REDAY => args.msg_name:%d, args.msg_obj:%s", args->msg_name, args->msg_obj);
            rt_thread_mdelay(200);//模拟耗时操作,在此期间发布的消息不会丢失
            //释放消息
            task_msg_release(args);
        }
        else
        {
            //可以做其它操作,在此期间发布的消息不会丢失
        }
    }
}

rt_thread_t t_wait = rt_thread_create("msg_wt", msg_wait_thread_entry, RT_NULL, 512, 17, 10);
rt_thread_startup(t_wait);

同时接收多个指定的消息:

static void msg_wait_any_thread_entry(void *params)
{
    rt_err_t rst;
    task_msg_args_t args = RT_NULL;
    const enum task_msg_name name_list[4] = {TASK_MSG_OS_REDAY, TASK_MSG_NET_REDAY, TASK_MSG_2, TASK_MSG_3};
    //创建 多消息订阅者
    int subscriber_id = task_msg_subscriber_create2(name_list, sizeof(name_list)/sizeof(enum task_msg_name));
    if(subscriber_id < 0) return;

    while(1)
    {
        rst = task_msg_wait_until(subscriber_id, 50, &args);
        if(rst==RT_EOK)
        {
            if(args->msg_name==TASK_MSG_OS_REDAY)
            {
                LOG_D("[task_msg_wait_any]:TASK_MSG_OS_REDAY => msg_obj is null:%s", args->msg_obj==RT_NULL ? "true" : "false");
            }
            else if(args->msg_name==TASK_MSG_NET_REDAY)
            {
            #ifdef TASK_MSG_USING_JSON
                cJSON *root = cJSON_Parse(args->msg_obj);
                if(root)
                {
                    int net_reday, id;
                    cJSON_item_get_number(root, "net_reday", &net_reday);
                    cJSON_item_get_number(root, "id", &id);
                    const char *ip = cJSON_item_get_string(root, "ip");
                    LOG_D("[task_msg_wait_any]:TASK_MSG_NET_REDAY => net_reday:%s, ip:%s, id:%d", (net_reday==0 ? "false" : "true"), ip, id);
                    cJSON_Delete(root);
                }
            #else
                LOG_D("[task_msg_wait_any]:TASK_MSG_NET_REDAY => args.msg_name:%d, args.msg_obj:%s", args->msg_name, args->msg_obj);
            #endif
            }
            else if(args->msg_name==TASK_MSG_2)
            {
                struct msg_2_def *msg_2 = (struct msg_2_def *)args->msg_obj;
                LOG_D("[task_msg_wait_any]:TASK_MSG_2 => msg_2.id:%d, msg_2.name:%s", msg_2->id, msg_2->name);
            }
        #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
            else if(args->msg_name==TASK_MSG_3)
            {
                struct msg_3_def *msg_3 = (struct msg_3_def *)args->msg_obj;
                LOG_D("[task_msg_wait_any]:TASK_MSG_3 => msg_3.id:%d, msg_3.name:%s", msg_3->id, msg_3->name);
                LOG_HEX("[task_msg_wait_any]:TASK_MSG_3 => msg_3.buffer", 16, msg_3->buffer, msg_3->buffer_size);
            }
        #endif
            rt_thread_mdelay(200);//模拟耗时操作,在此期间发布的消息不会丢失
            //释放消息
            task_msg_release(args);
        }
        else
        {
            //可以做其它操作,在此期间发布的消息不会丢失
        }
    }    
}

rt_thread_t t_wait_any = rt_thread_create("msg_wa", msg_wait_any_thread_entry, RT_NULL, 1024, 16, 10);
rt_thread_startup(t_wait_any);

4、注意事项

  • 不要在订阅消息的回调函数中执行耗时的操作,否则,请在单独的线程中,使用task_msg_wait_until来处理需要关注的消息。

  • 如果使用了结构体数据类型的消息,同时在结构体中定义了指针,且动态分配了内存,一定要设置释放内存的钩子函数,否则会造成内存泄露。

  • 在使用task_msg_wait_until函数接收消息时,仅当函数返回了RT_EOK时,记得使用task_msg_release函数释放该消息(在其它任何情况下都不要使用task_msg_release函数)。当所有关注该消息的订阅者全部释放了该消息时,该消息才真正从物理内存中释放。

5、联系方式 & 感谢

Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

简介

基于RT-Thread的消息总线,可以轻松的实现线程间的同步和消息收发,支持文本、数字、结构体等任意复杂的消息类型的发送和接收 展开 收起
C
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
C
1
https://gitee.com/david2lf/TaskMsgBus.git
git@gitee.com:david2lf/TaskMsgBus.git
david2lf
TaskMsgBus
TaskMsgBus
master

搜索帮助