阿里妹导读
概述
走近iLogtail
iLogtail架构发展历程
单一文件采集阶段
只能采集日志文件;
假定日志为单一格式,每种格式的日志仅支持一种处理方式(如正则解析、Json解析等);
只能将日志发送至日志服务;
完全由C++实现,在日志采集方面具有显著优势;
由于需求单一,因此整体架构偏向于单体架构,代码设计以面向过程为主,类的功能划分不明确,多个模块使用同一个类对象,导致类间依赖严重,可扩展性较差;
功能实现与日志服务相关概念(如LogGroup和Logstore等)强绑定,普适性较差;
Golang插件扩展阶段
多样化的数据输入输出选项
个性化的数据处理能力组合
高性能的数据处理吞吐能力
每个采集配置对应一条完整的流水线,各个流水线之间的资源互相独立,互不影响;
每条流水线支持多个输入和输出,同时支持从C++主程序中接收数据及向C++主程序发送数据;
每条流水线支持多个处理插件级联,有效提升处理能力;
插件系统本身具备配置管理能力,支持配置的热加载,可独立于C++主程序进行工作。
采集日志文件并使用C++的处理能力,最后将数据投递至日志服务SLS(1和4号组合);
采集日志文件并使用Golang插件进行处理,最后将数据投递至日志服务SLS(2和4号组合);
采集日志文件并使用Golang插件进行处理,最后将数据投递至第三方存储(2和5号组合);
采集其它输入(如syslog)并使用Golang插件进行处理,最后将数据投递至日志服务SLS(3和4号组合);
采集其它输入(如syslog)并使用Golang插件进行处理,最后将数据投递至第三方存储(3和5号组合)。
C++和Golang多语言实现,C++部分拥有性能优势,Golang部分拥有功能优势;
支持多样化输入和输出选项;
数据处理能力有一定提升,但输入输出与处理模块间的组合能力存在多种限制:
C++部分原生的高性能处理能力仍然局限于采集日志文件并投递至日志服务的场景使用;
C++部分的高性能处理能力无法与插件系统的多样化处理能力相结合,二者只能选其一,从而降低了复杂日志处理场景的性能。
为什么要重构?
由于C++主程序代码存在错综复杂的类间依赖关系,导致开发难度极大,加之C++的处理能力无法被社区所使用,因此C++主程序的开发几乎无人问津。
不论是C++主程序还是Golang插件系统,其内部的数据交互模型只适用于可观测数据中的Log,而无法表达Metric和Trace。除此以外,这些数据结构均针对SLS而设计,导致在向第三方存储系统投递数据时,必须进行额外的数据结构转换,从而降低整体的性能。
碍于C++主程序代码错综复杂的类间依赖关系,商业版代码与开源版的剥离只能采用非常原始和丑陋的文件替换方式。这种操作直接导致如下两个结果:
开源版代码中存在大量意义不明的无用空函数;
在进行商业版代码开发时,首先需要进行文件替换,从而容易引入开源版和商业版代码的不一致,对联调联测带来诸多不便,影响开发和发布效率。
目标
将iLogtail的内部数据模型更换为通用数据模型,以减少数据投递时不必要的数据结格式转换;
将C++主程序的输入、处理和输出能力全面插件化,便于从产品侧统一C++部分和Golang部分的插件概念;
在C++主程序中增加可观测流水线的概念,强化C++主程序的流水线配置管理能力,以支持C++处理能力间的级联和C++处理能力与Golang处理能力的组合,从而增强C++的主体地位;
统一商业版和开源版的采集配置格式,均采用流水线模式的配置结构,以适应最新的iLogtail架构;
优化采集配置热加载的方式,提升配置容错能力;
优化商业版代码嵌入开源版代码的路径,通过仅追加文件而非切换文件的方式来实现,提升开发效率。
实践
数据模型通用化
message Log
{
required uint32 Time = 1;
message Content
{
required string Key = 1;
required string Value = 2;
}
repeated Content Contents= 2;
repeated string values = 3;
optional fixed32 Time_ns = 4;
}
message LogTag
{
required string Key = 1;
required string Value = 2;
}
message LogGroup
{
repeated Log Logs= 1;
optional string Category = 2;
optional string Topic = 3;
optional string Source = 4;
optional string MachineUUID = 5;
repeated LogTag LogTags = 6;
}
可以看到,每个LogGroup包含若干Log以及LogTag,以及其他一些元信息。显然,使用这个数据结构作为iLogtail内部的数据模型是有所不足的:
如LogGroup这个名字所示,该数据结构仅适用于表达可观测数据中的Log,而无法表达Metric和Trace,缺乏普适性;
LogGroup是一个PB,应当只是在最终发送数据时使用,而不适合作为通用的内存数据模型。另一方面,这个PB只适用于SLS,并不适用于其他第三方存储。因此,在往第三方存储发送数据时,需要额外进行数据格式转换,降低采集效率。
支持表达可观测数据的所有类型,包括Log、Metric和Trace,提升数据结构的普适性;
发送模块可根据自身需要,选择不同的协议对通用数据结构进行序列化,提升发送协议的灵活性和性能。
PipelineEventGroup
mEvents:一组事件;
mMetadata:EventGroup共享的元信息,例如机器ip、容器名称、日志路径等;仅在生成EventGroup时可写,且保存于内存中,不用于最终输出;
mTags:EventGroup共享的tag,与原有架构中的LogTag相对应,用于保存mMetadata中用户需要实际输出的信息,仅在tag处理插件中可写;
mSourceBuffer:EventGroup共享的内存分配器,所有成员变量涉及的内存分配均需由该分配器分配。
PipelineEvent
PipelineEventPtr
插件抽象
class Plugin {
public:
virtual ~Plugin() = default;
virtual const std::string& Name() const = 0;
// other setters && getters
protected:
PipelineContext* mContext = nullptr;
};
其中,成员变量mContext指向插件所属流水线(Pipeline)的上下文信息(具体含义将在下文介绍),成员函数Name()返回该插件的名字。
处理插件
接口定义
class Processor : public Plugin {
public:
virtual ~Processor() {}
virtual bool Init(const Json::Value& config) = 0;
virtual void Process(std::vector<PipelineEventGroup>& logGroupList);
protected:
virtual bool IsSupportedEvent(const PipelineEventPtr& e) const = 0;
virtual void Process(PipelineEventGroup& logGroup) = 0;
};
Init函数:负责根据采集配置实例化插件,并返回是否成功实例化; Process函数:负责对输入的每一个PipelineEventGroup进行处理,并将处理结果通过同一变量返回。
原有能力抽象
显然,将日志文件读取和日志解析的能力统一放到一个类中是一个不太合理的设计,完全缺乏可扩展性。为此,我们需要将日志切分(LogSplit函数)和日志解析(ParseLogLine函数)的能力从LogFileReader类中剥离开来,同时和LogParser类中相关的函数进行重新组合,从而形成多个独立的处理插件。
ProcessorSplitLogStringNative:日志切分处理插件,用于对日志块按照指定分隔符进行切分生成多个事件;
ProcessorSplitRegexNative:日志切分处理插件,用于对日志块按照正则表达式进行切分生成多个事件;
ProcessorParseRegexNative:正则解析插件,通过正则匹配解析事件指定字段内容并提取新字段;
ProcessorParseJsonNative:Json解析插件,解析事件中Json格式字段内容并提取新字段;
ProcessorParseDelimiterNative:分隔符解析插件,解析事件中分隔符格式字段内容并提取新字段;
ProcessorParseTimestampNative:时间解析插件,用于解析事件中记录时间的字段,并将结果置为事件的__time__字段;
ProcessorFilterRegexNative:事件过滤插件,用于根据事件字段内容来过滤事件;
ProcessorDesensitizeNative:脱敏插件,用于对事件的字段内容进行脱敏;
ProcessorTagNative:tag处理插件,用于将PipelineEventGroup的mMetadata成员选择性地加入mTag成员用于最终输出,同时支持对tag的key进行重命名。
输入插件
接口定义
class Input : public Plugin {
public:
virtual ~Input() = default;
virtual bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) = 0;
virtual bool Start() = 0;
virtual bool Stop(bool isPipelineRemoving) = 0;
};
Init函数:负责根据采集配置实例化插件,并返回是否成功实例化以及可能的Golang流水线组件;
Start函数:启动输入插件;
Stop函数:根据流水线是否即将被移除,采取不同的策略停止输入插件;
原有能力抽象
bool InputFile::Start() {
if (!FileServer::GetInstance()->IsRunning()) {
FileServer::GetInstance()->Start();
}
if (mEnableContainerDiscovery) {
mFileDiscovery.SetContainerInfo(
FileServer::GetInstance()->GetAndRemoveContainerInfo(mContext->GetPipeline().Name()));
}
FileServer::GetInstance()->AddFileDiscoveryConfig(mContext->GetConfigName(), &mFileDiscovery, mContext);
FileServer::GetInstance()->AddFileReaderConfig(mContext->GetConfigName(), &mFileReader, mContext);
FileServer::GetInstance()->AddMultilineConfig(mContext->GetConfigName(), &mMultiline, mContext);
FileServer::GetInstance()->AddExactlyOnceConcurrency(mContext->GetConfigName(), mExactlyOnceConcurrency);
return true;
}
bool InputFile::Stop(bool isPipelineRemoving) {
if (!FileServer::GetInstance()->IsPaused()) {
FileServer::GetInstance()->Pause();
}
if (!isPipelineRemoving && mEnableContainerDiscovery) {
FileServer::GetInstance()->SaveContainerInfo(mContext->GetPipeline().Name(), mFileDiscovery.GetContainerInfo());
}
FileServer::GetInstance()->RemoveFileDiscoveryConfig(mContext->GetConfigName());
FileServer::GetInstance()->RemoveFileReaderConfig(mContext->GetConfigName());
FileServer::GetInstance()->RemoveMultilineConfig(mContext->GetConfigName());
FileServer::GetInstance()->RemoveExactlyOnceConcurrency(mContext->GetConfigName());
return true;
}
如果文件采集总线程未启动,则调用FileServer类的Start函数启动线程;
将插件相关配置注册到FileServer类中。
如果文件采集线程未暂停,则调用FileServer类的Stop函数暂停全局文件采集;
将插件相关配置从FileServer类中删除。
可扩展性
输出插件
接口定义
class Flusher : public Plugin {
public:
virtual ~Flusher() = default;
virtual bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) = 0;
virtual bool Start() = 0;
virtual bool Stop(bool isPipelineRemoving) = 0;
};
Init函数:负责根据采集配置实例化插件,并返回是否成功实例化以及可能的Golang流水线组件;
Start函数:启动输出插件;
Stop函数:根据流水线是否即将被移除,采取不同的策略停止输出插件;
原有能力抽象
bool FlusherSLS::Start() {
SLSSender::Instance()->IncreaseProjectReferenceCnt(mProject);
SLSSender::Instance()->IncreaseRegionReferenceCnt(mRegion);
SLSSender::Instance()->IncreaseAliuidReferenceCntForRegion(mRegion, mAliuid);
return true;
}
bool FlusherSLS::Stop(bool isPipelineRemoving) {
SLSSender::Instance()->DecreaseProjectReferenceCnt(mProject);
SLSSender::Instance()->DecreaseRegionReferenceCnt(mRegion);
SLSSender::Instance()->DecreaseAliuidReferenceCntForRegion(mRegion, mAliuid);
return true;
}
可扩展性
流水线抽象
统一C++主程序和Golang插件系统的流水线,加强C++主程序的主体地位;
支持C++处理能力的级联,极大地提升C++部分对于复杂日志的处理能力;
便于C++插件和Golang插件的组合,从而提供更灵活的插件编排能力,同时从产品层面提供更加统一的视图。
插件编排
从产品层面,扩展处理插件仅起到辅助作用,仅在单纯使用原生处理插件无法满足处理需求时使用。因此,扩展处理插件相对于原生处理插件而言是补充和追加关系,而非对等关系。
从架构层面,毕竟原生处理插件和扩展处理插件分别由不同的语言实现,二者之间的交互必须通过CGO接口来完成。从性能角度,应当尽可能避免频繁的CGO接口调用,因此在处理阶段,只允许数据单向地从C++主程序流向Golang插件系统。
根据以上描述,在新架构中,数据的可能通路如下所示:
数据通路应尽可能多地经过C++组件;
数据通路应当尽可能减少CGO接口;
由于Golang插件系统的运行也是以流水线的形式进行的,并不能以插件的形式单独存在,因此我们重新定义Golang流水线为流水线的子流水线。从上图中也能看到,Golang子流水线可能有两种形式:
包含输入插件:如2、3、5组合或者2、4组合;
不包含输入插件:如1、3、4组合。
插件实例
可以看到,每个插件实例都有一个mId成员用于唯一标识一个插件实例,以及一个mPlugin成员指向真正的插件。
反馈队列
LogStore是SLS特有的概念,对于往第三方存储发送数据的场景(例如开源场景),没有Logstore的概念,只能默认所有的配置共用一个Logstore,即所有配置共用一个缓冲队列。这显然是不合理的。
即便是往日志服务投递数据的场景,由于一个Logstore包含多个采集配置,因此难以通过反馈队列实现配置级的资源管控。
与Golang插件系统类似,每个流水线拥有一个独立的处理队列;
对于发送队列,从Process线程的角度看,每个发送插件拥有一个发送队列。但实际上这个发送队列内部可进一步包含多个子队列,例如SLS输出插件仍然保持每个Logstore一个发送队列;
发送队列与处理队列之间的反馈不再是一对一的关系,而是改成多对一的关系。
由于流水线的资源管控往往只需要对流水线的源头进行控制即可,因此处理队列保持以流水线为粒度能够保证流水线资源控制的正常进行,同时还便于对流水线进行优先级的区分。
由于不同的发送服务端有着不同的资源管控粒度(例如SLS对Logstore的流量有限制),但这些细节对于Process线程来说没有意义。因此,通过设计模式中的代理模式(Proxy)保持一个发送插件实例一个逻辑上的发送队列能够最大程度简化类间交互,增强可扩展性,同时降低内存使用。
运用设计模式中的观察者模式(Observer)有助于提升反馈队列交互的可扩展性。
流水线定义
class Pipeline {
public:
bool Init(Config&& config);
void Start();
void Process(std::vector<PipelineEventGroup>& logGroupList);
void Stop(bool isRemoving);
// other getters & setters
private:
std::string mName;
std::vector<std::unique_ptr<InputInstance>> mInputs;
std::vector<std::unique_ptr<ProcessorInstance>> mProcessorLine;
std::vector<std::unique_ptr<FlusherInstance>> mFlushers;
Json::Value mGoPipelineWithInput;
Json::Value mGoPipelineWithoutInput;
FeedbackQueue<PipelineEventGroup> mProcessQueue;
mutable PipelineContext mContext;
std::unique_ptr<Json::Value> mConfig;
// other private members
};
Init函数:根据采集配置进行插件编排,实例化所有的C++插件,并加载可能存在的Golang子流水线;
Start函数:按照从输出到输出的顺序(即数据通路图中的5至1顺序)依次启动各个组件;
Process函数:按顺序使用C++插件对输入的PipelineEventGroup列表进行处理;
Stop函数:按照从输入到输出的顺序(即数据通路图中的1至5顺序)依次停止各个组件。
mName:流水线的名字,与采集配置名相同;
mInputs:C++输入插件实例列表;
mProcessors:原生处理插件实例列表;
mflushers:C++输出插件实例列表;
mGoPipelineWithInput:包含输入插件的Golang子流水线,可选;
mGoPipelineWithoutInput:不包含输入插件的Golang子流水线,可选;
mContext:流水线上下文;
mProcessQueue:当前流水线的处理队列;
mConfig:采集配置的原始内容。
mConfigName:流水线的名称;
mGlobalConfig:流水线级别的配置,由采集配置给出;
mPipeline:指向当前流水线的指针;
mLogger和mAlarm:用于打印日志和发送告警的全局组件。
采集配置管理优化
配置格式
商业版配置:采用平铺结构,没有任何层次,且仅支持JSON格式
{
"aliuid": "1234567890",
"category": "test_logstore",
"create_time": 1693370409,
"defaultEndpoint": "cn-shanghai-intranet.log.aliyuncs.com",
"delay_alarm_bytes": 0,
"delay_skip_bytes": 0,
"discard_none_utf8": false,
"discard_unmatch": true,
"docker_exclude_env": {},
"docker_exclude_label": {},
"docker_file": false,
"docker_include_env": {},
"docker_include_label": {},
"enable": true,
"enable_tag": false,
"file_encoding": "utf8",
"file_pattern": "*.log",
"filter_keys": [],
"filter_regs": [],
"group_topic": "aaaaaaab",
"keys": [
"k1,k2"
],
"local_storage": true,
"log_begin_reg": ".*",
"log_path": "/home",
"log_type": "common_reg_log",
"log_tz": "",
"max_depth": 10,
"max_send_rate": -1,
"merge_type": "topic",
"preserve": true,
"preserve_depth": 1,
"priority": 0,
"project_name": "test-project",
"raw_log": false,
"regex": [
"(\\d+)x(.*)"
],
"region": "cn-shanghai",
"send_rate_expire": 0,
"sensitive_keys": [],
"tail_existed": false,
"timeformat": "",
"topic_format": "none",
"tz_adjust": false,
"version": 3
}
开源版配置:采用流水线结构,有较好的层次,但只支持YAML格式
inputs:
Type: file_log
LogPath: /home
FilePattern: '*.log'
MaxDepth: 10
processors:
Type: processor_regex_accelerate
Regex: '(\\d+)x(.*)'
Keys: ["k1", "k2"]
flushers:
Type: flusher_sls
ProjectName: test-project
LogstoreName: test_logstore
Endpoint: cn-shanghai-intranet.log.aliyuncs.com
其中,inputs、processors、aggregators和flushers中可包含任意数量的插件,包括C++插件和Golang插件。
配置文件组织
商业版管控端下发的配置为一个文件存放所有的采集配置,文件格式仅支持JSON,默认位置为/usr/local/ilogtail/user_log_config.json;
本地商业版配置既支持一个文件一个采集配置,也支持一个文件多个采集配置,文件格式仅支持JSON,默认存放位置为/etc/ilogtail/user_config.d目录和user_local_config.json;
本地开源版配置仅支持一个文件一个采集配置,文件格式仅支持YAML,默认存放位置为/etc/ilogtail/user_yaml_config.d目录;
开源版管控端下发的配置为一个文件一个采集配置,文件格式仅支持YAML,默认存放位置为/etc/ilogtail/remote_yaml_config.d目录;
每个文件存放一个采集配置,文件名即为采集配置名;
文件名后缀标识文件格式,支持json和yaml(或yml);
同一来源的采集配置放在同一个目录下,默认存放位置为/etc/ilogtail/config/<source>,其中<source>代表来源,目前包括:
商业版管控端下发的配置:enterprise
开源版管控端下发的配置:common
本地:local
配置热加载
class ConfigWatcher {
public:
static ConfigWatcher* GetInstance();
ConfigDiff CheckConfigDiff();
void AddSource(const std::string& dir, std::mutex* mux = nullptr);
private:
std::vector<std::filesystem::path> mSourceDir;
std::map<std::string, std::pair<uintmax_t, std::filesystem::file_time_type>> mFileInfoMap;
// other members
};
AddSource函数:向mSourceDir注册新的需要监控的存放采集配置的目录; CheckConfigDiff函数:检查所有被监控目录的采集配置文件是否有改变,返回新增、删除和存在修改的配置(记录在ConfigDiff结构体中),并在mFileInfoMap中更新最新的文件状态。
当CheckConfigDiff函数返回非空,则会进一步调用PipelineManager类的UpdatePipelines函数将配置加载成实际的流水线:
void logtail::PipelineManager::UpdatePipelines(ConfigDiff& diff) {
for (const auto& name : diff.mRemoved) {
mPipelineNameEntityMap[name]->Stop(true);
mPipelineNameEntityMap.erase(name);
}
for (auto& config : diff.mModified) {
auto p = BuildPipeline(std::move(config));
if (!p) {
continue;
}
mPipelineNameEntityMap[config.mName]->Stop(false);
mPipelineNameEntityMap[config.mName] = p;
p->Start();
}
for (auto& config : diff.mAdded) {
auto p = BuildPipeline(std::move(config));
if (!p) {
continue;
}
mPipelineNameEntityMap[config.mName] = p;
p->Start();
}
}
可以看到,采用上述两步走的配置热加载方法,可以最大程度提升流水线的容错能力,即仅当采集配置对应的流水线完全合法时才会进行加载。对于正在运行的流水线,如果因为某些原因导致对应的采集配置文件非法,则目前正在运行的流水线仍会继续正常运行,不会被非法的采集配置影响。
远程配置下发
class ConfigProvider {
public:
virtual void Init(const std::string& dir);
virtual void Stop() = 0;
protected:
std::filesystem::path mSourceDir;
mutable std::mutex mMux;
};
Init函数:执行初始化操作,创建mSourceDir目录并调用ConfigWatcher类的AddSource函数注册目录,同时启动线程定时拉取远端配置。
Stop函数:停止ConfigProvider。
商业版管控端配置拉取:EnterpriseConfigProvider类;
开源版管控端配置拉取:CommonConfigProvider类。
进程配置管理优化
AppConfig类无限增长,内部缺乏有效组织,时间一久便难以维护;
几乎所有模块都要通过AppConfig类来获取参数,因此代码中存在大量的AppConfig::GetInstance()函数,造成代码冗余和阅读不便。
有一些参数仅在商业版中使用,导致AppConfig类需要维护开源版和商业版两份,增加出现不一致的概率。
商业版代码嵌入方式优化
商业版独有的功能:
组成单独的类放在单独的文件中,直接追加到开源版的目录中;
对于公共文件中的调用点,使用__ENTERPRISE__宏来控制开源和商业版的编译行为;
ShennongManager::GetInstance()->Pause();
// ...
ShennongManager::GetInstance()->Resume();
商业版和开源版行为存在差异:
尽可能使用单例模式;
将开源版的类作为基类,然后将类中行为不同的方法声明为虚函数;
将商业版的类作为开源类的派生类,并重写虚函数;
在GetInstance函数中使用__ENTERPRISE__宏和指向基类的指针来控制实际生效的类;
将商业版文件直接追加到开源版的目录中;
class ProfileSender {
public:
static ProfileSender* GetInstance();
virtual void SendToProfileProject(const std::string& region, sls_logs::LogGroup& logGroup);
// other members
};
ProfileSender* ProfileSender::GetInstance() {
static ProfileSender* ptr = new EnterpriseProfileSender();
static ProfileSender* ptr = new ProfileSender();
return ptr;
}
将与商业版配置拉取相关的代码从ConfigManager类和EventDispatcher类中剥离出来,重新组成EnterpriseConfigProvider类;
将与商业版鉴权相关的代码从ConfigManager类中剥离出来,移动到EnterpriseSLSControl类中;
将与商业版可观测数据发送相关的代码从ConfigManager类中剥离出来,移动到EnterpriseProfileSender类中;
将商业版特殊的指标监控代码从EventDispatcher类中剥离出来,重新组成ShennongManager类;
将与商业版相关的非配置级参数从AppConfig类中剥离出来,分别移动到上述新建立的类中。
option(ENABLE_ENTERPRISE "enable enterprise feature")
if (ENABLE_ENTERPRISE)
add_definitions(-D__ENTERPRISE__)
include(${CMAKE_CURRENT_SOURCE_DIR}/enterprise_options.cmake)
else ()
include(${CMAKE_CURRENT_SOURCE_DIR}/options.cmake)
endif ()
思考
新架构应该如何设计?
对于可观测流水线的通用概念(如数据类型和流水线定义等),iLogtail要尽量做到和领域内其它竞品保持一致,避免独树一帜给用户迁移带来不便和困惑;
对于架构实现,不能简单照搬其他主流可观测数据采集器的架构,而是在吸收其设计思想的前提下,针对iLogtail自身的特点(如双语言实现)进行原创设计,适合自己的才是最好的。
对于iLogtail的自身优势(如C++的高性能和配置热加载),在完整保留的同时,还需要将原本阻止优势发挥的限制尽可能地去除,使得自身优势能够在更多的场景中发挥作用,提升产品的核心竞争力。
iLogtail原有的测试体系不健全,如何保证重构后的代码不引入兼容性问题?
每个类具体负责的功能,为后续类合并和重构奠定基础;
类间依赖,尤其是相关参数在多个类内使用的情况;
不常用的功能点,了解其预期行为,从而为补充UT作准备。
如何保证代码质量?
总结