鼎鼎知识库
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

01AntJob的运作原理.md 3.9KB

表结构

//每一个应用实例就是一个App
public class App
{
	public int ID {get;set;}
	publi string DisplayName {get;set;}
	public string Secret {get;set;}	
}

//描述App的工作状态
public class AppOnline
{
	public int ID {get;set;}
	public string Instantce {get;set;}
	public string Client {get;set;}
	public string Name {get;set;}
	public int ProcessId {get;set;}
	public string Version {get;set;}
	public string Server {get;set;}
	public int Tasks {get;set;}
	public int Total {get;set;}
}

//作业
public class Job
{
	public int ID {get;set;}
	public int AppID {get;set;}
	public string Name {get;set;}
	public string ClassName {get;set;}
	public string DisplayName {get;set;}
	public int Mode {get;set;}
	public string Topic {get;set;}
	public int MessageCount {get;set;}
	public DateTime Start {get;set;}
	public DateTime End {get;set;}
	public int Step {get;set;}
	public int BatchSize {get;set;}
	public int Offset {get;set;}	
}

//作业的任务
public class JobTask
{
	public int ID {get;set;}
	public int AppID {get;set;}
	public int JobID {get;set;}
}

应用、调度中心、Web控制台

  • 计算应用:理解成一个应用实例,可能是本地开发环境的控制台程序,可能是服务器上的一个服务
  • 调度中心:调度所有的作业,需要被持久化
  • Web控制台:展示调度中心的持久化内容

最简单的例子

应用要做的事情是把作业交给调度器开始执行。

            var set = AntSetting.Current;

            // 实例化调度器
            var sc = new Scheduler();

            // 使用分布式调度引擎替换默认的本地文件调度
            sc.Provider = new NetworkJobProvider
            {
                Server = set.Server,
                AppID = set.AppID,
                Secret = set.Secret,
            };

            // 添加作业处理器
            sc.Handlers.Add(new HelloJob());
            sc.Handlers.Add(new BuildPatient());
            sc.Handlers.Add(new BuildWill());

            // 启动调度引擎,调度器内部多线程处理
            sc.Start();

所有的作业都是Handler类型。

	[DisplayName("定时欢迎")]
    [Description("简单的定时任务")]
    internal class HelloJob : Handler
    {
        public HelloJob()
        {
            // 今天零点开始,每10秒一次
            var job = Job;
            job.Start = DateTime.Today;
            job.Step = 10;
        }

        protected override Int32 Execute(JobContext ctx)
        {
            // 当前任务时间
            var time = ctx.Task.Start;
            WriteLog("新生命蚂蚁调度系统!当前任务时间:{0}", time);

            // 成功处理数据量
            return 1;
        }
    }

antjob1

Scheduler.Start()做了什么?

public class Scheduler
{
	public List<Handler> Handlers { get; } = new List<Handler>();
	public IJobProvider Provider { get; set; }//作业提供者
	
	public void Start()
	{
		var prv = Provider;
		prv.Schedule = this;
		prv.Start();//作业提供者开始
		
		var jobs = prv.GetJobs();
		
		var hs = Handlers;
		foreach (var handler in hs)
		{
			handler.Schedule = this;
			handler.Provider = prv;
			
			var job = jobs.FirstOrDefault(e => e.Name == handler.Name);
			if(job!=null && job.Mode ==0) job.Mode = handler.Mode;
			handler.Job = job;
			
			handler.Start();//作业处理器开始
		}
		
		if(Period > 0) _timer = new TimeX(Loop, null, Period * 1000, "Job"){Async = true};
	}
}

Scheduler中维护着Handler的集合以及IJobProviderIJobProvider需要知道SchedulerHandler需要知道SchedulerIJobProvider以及IJobSchedulerIJobProvider和每个Handler开始工作。更加重要的是,在实例化TimeX的过程中开始了任务调度,即Scheudler中的Process()

Pr