Sunday, August 18, 2024

Distributed Task Runner

 Distributed Task Runner


Manager

startup.cs

using ManagerNode;

using Microsoft.AspNetCore.Builder;

using Microsoft.Extensions.DependencyInjection;

using Microsoft.Extensions.Hosting;

using Quartz;

using Quartz.Impl;

using Quartz.Spi;


public class Startup

{

    public void ConfigureServices(IServiceCollection services)

    {

        services.AddControllers();

        services.AddSingleton<JobSchedulerService>();

        services.AddSingleton<RabbitMqJobPublisher>();


        // Register Quartz services

        services.AddSingleton<IJobFactory, SingletonJobFactory>();

        services.AddSingleton<ISchedulerFactory, StdSchedulerFactory>();


        // Register the job itself

        services.AddSingleton<DistributedQuartzJob>();


        // Register the Quartz scheduler with dependencies

        services.AddHostedService<QuartzHostedService>();

    }


    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)

    {

        if (env.IsDevelopment())

        {

            app.UseDeveloperExceptionPage();

        }


        app.UseRouting();

        app.UseEndpoints(endpoints =>

        {

            endpoints.MapControllers();

        });

    }

}


public class SingletonJobFactory : IJobFactory

{

    private readonly IServiceProvider _serviceProvider;


    public SingletonJobFactory(IServiceProvider serviceProvider)

    {

        _serviceProvider = serviceProvider;

    }


    public IJob NewJob(TriggerFiredBundle bundle, IScheduler scheduler)

    {

        return _serviceProvider.GetService(bundle.JobDetail.JobType) as IJob;

    }


    public void ReturnJob(IJob job) { }

}


public class QuartzHostedService : IHostedService

{

    private readonly ISchedulerFactory _schedulerFactory;

    private readonly IJobFactory _jobFactory;

    private readonly JobSchedulerService _jobSchedulerService;

    private IScheduler _scheduler;


    public QuartzHostedService(ISchedulerFactory schedulerFactory, IJobFactory jobFactory, JobSchedulerService jobSchedulerService)

    {

        _schedulerFactory = schedulerFactory;

        _jobFactory = jobFactory;

        _jobSchedulerService = jobSchedulerService;

    }


    public async Task StartAsync(CancellationToken cancellationToken)

    {

        _scheduler = await _schedulerFactory.GetScheduler(cancellationToken);

        _scheduler.JobFactory = _jobFactory;


        // If you need to start scheduling jobs on startup, do it here

        await _scheduler.Start(cancellationToken);

    }


    public async Task StopAsync(CancellationToken cancellationToken)

    {

        if (_scheduler != null)

        {

            await _scheduler.Shutdown(cancellationToken);

        }

    }

}


program.cs


public class Program

{

    public static void Main(string[] args)

    {

        CreateHostBuilder(args).Build().Run();

    }


    public static IHostBuilder CreateHostBuilder(string[] args) =>

        Host.CreateDefaultBuilder(args)

            .ConfigureWebHostDefaults(webBuilder =>

            {

                webBuilder.UseStartup<Startup>();

            });

}


JobSchedulerService.cs

using ManagerNode;

using Quartz;

using Quartz.Impl;

using RabbitMQ.Client;

using System.Text;


public class JobSchedulerService

{

    private readonly IScheduler _scheduler;


    public JobSchedulerService(ISchedulerFactory schedulerFactory)

    {

        _scheduler = schedulerFactory.GetScheduler().Result;

    }


    public async Task ScheduleJob(string jobName, string jobGroup, string triggerName, string triggerGroup, DateTimeOffset startTime)

    {

        var job = JobBuilder.Create<DistributedQuartzJob>()

            .WithIdentity(jobName, jobGroup)

            .Build();


        var trigger = TriggerBuilder.Create()

            .WithIdentity(triggerName, triggerGroup)

            .StartAt(startTime)

            .Build();


        await _scheduler.ScheduleJob(job, trigger);

    }

}


public class DistributedQuartzJob : IJob

{

    private readonly RabbitMqJobPublisher _rabbitMqJobPublisher;


    public DistributedQuartzJob(RabbitMqJobPublisher rabbitMqJobPublisher)

    {

        _rabbitMqJobPublisher = rabbitMqJobPublisher;

    }


    public Task Execute(IJobExecutionContext context)

    {

        // Job execution logic

        var message = "DistributedJob executed at " + DateTime.Now;

        Console.WriteLine(message);


        // Add job details to RabbitMQ queue

        _rabbitMqJobPublisher.PublishJob(message);


        return Task.CompletedTask;

    }

}

RabbitMqJobPublisher.cs

using Newtonsoft.Json;

using RabbitMQ.Client;

using System.Text;

using DistributedScheduler.Jobs;


namespace ManagerNode

{

    public class TypedObject

    {

        public required string Type { get; set; }

        public required object Data { get; set; }

    }


    public class RabbitMqJobPublisher

    {

        private readonly IConnection _connection;

        private readonly IModel _channel;


        public RabbitMqJobPublisher()

        {

            var factory = new ConnectionFactory() { HostName = "localhost" };

            _connection = factory.CreateConnection();

            _channel = _connection.CreateModel();


            _channel.QueueDeclare(queue: "job_queue",

                                 durable: true,

                                 exclusive: false,

                                 autoDelete: false,

                                 arguments: null);

        }


        public void PublishJob(IDistributedJob job)

        {

            var message = JsonConvert.SerializeObject(job, Formatting.Indented);

            var body = Encoding.UTF8.GetBytes(message);


            _channel.BasicPublish(exchange: "",

                                 routingKey: "job_queue",

                                 basicProperties: null,

                                 body: body);

        }


        public void PublishJob(string jobString)

        {

            var message = jobString;

            var body = Encoding.UTF8.GetBytes(message);


            _channel.BasicPublish(exchange: "",

                                 routingKey: "job_queue",

                                 basicProperties: null,

                                 body: body);

        }

    }

}


JobController.cs


using DistributedScheduler.Jobs;
using Microsoft.AspNetCore.Mvc;

namespace ManagerNode.Controllers
{
    [ApiController]
    [Route("api/manager/job")]
    public class JobController : ControllerBase
    {
        private readonly JobSchedulerService _jobSchedulerService;

        public JobController(JobSchedulerService jobSchedulerService)
        {
            _jobSchedulerService = jobSchedulerService;
        }

        [HttpPost]
        public async Task<IActionResult> SubmitJob([FromBody] DistributedJob job)
        {

            if (_jobSchedulerService is null)
                throw new ArgumentNullException(nameof(_jobSchedulerService));

            await _jobSchedulerService.ScheduleJob(job.Name, job.Name, job.Name, job.Name, new DateTimeOffset(DateTime.Now.AddMinutes(1)));

            return Ok(new { job.Id, Status = "Job submitted and published to queue successfully" });
        }
    }

}

ManagerController.cs

using Microsoft.AspNetCore.Mvc;

[ApiController]
[Route("api/manager")]
public class ManagerController : ControllerBase
{
    [HttpGet("health")]
    public IActionResult Health()
    {
        var healthStatus = new
        {
            Status = "Healthy",
            PendingTasks = 5, // Example value
            ConnectedRunners = 10 // Example value
        };
        return Ok(healthStatus);
    }

    [HttpGet("status")]
    public IActionResult Status()
    {
        // Return detailed status, possibly with more complex data
        return Ok("Manager is operational with all systems functioning.");
    }
}

Runner

startup.cs

using Microsoft.AspNetCore.Builder;

using Microsoft.Extensions.DependencyInjection;

using Microsoft.Extensions.Hosting;


public class Startup

{

    public void ConfigureServices(IServiceCollection services)

    {

        services.AddControllers();

        services.AddSingleton<JobConsumerService>(); // Register JobConsumerService

    }


    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)

    {

        if (env.IsDevelopment())

        {

            app.UseDeveloperExceptionPage();

        }


        app.UseRouting();

        app.UseEndpoints(endpoints =>

        {

            endpoints.MapControllers();

        });


        // Ensure JobConsumerService is instantiated when the application starts

        var jobConsumerService = app.ApplicationServices.GetService<JobConsumerService>();

    }

}


program.cs

using Microsoft.AspNetCore.Hosting;

using Microsoft.Extensions.Hosting;


public class Program

{

    public static void Main(string[] args)

    {

        CreateHostBuilder(args).Build().Run();

    }


    public static IHostBuilder CreateHostBuilder(string[] args) =>

        Host.CreateDefaultBuilder(args)

            .ConfigureWebHostDefaults(webBuilder =>

            {

                webBuilder.UseStartup<Startup>();

            });

}

RunnerController.cs

using Microsoft.AspNetCore.Mvc;


namespace RunnerNode.Controllers

{

    [ApiController]

    [Route("api/runner")]

    public class RunnerController : ControllerBase

    {

        [HttpGet("health")]

        public IActionResult GetHealth()

        {

            // Implement health check logic

            return Ok("Runner Node is healthy");

        }


        [HttpGet("status")]

        public IActionResult GetStatus()

        {

            // Implement status retrieval logic

            return Ok("Status: Processing jobs");

        }

    }


}

JobConsumeService.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

public class JobConsumerService
{
    private readonly IConnection _connection;
    private readonly IModel _channel;

    public JobConsumerService()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();

        _channel.QueueDeclare(queue: "job_queue",
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            ProcessJob(message);
        };

        _channel.BasicConsume(queue: "job_queue",
                             autoAck: true, // Auto-acknowledge messages
                             consumer: consumer);
    }

    private void ProcessJob(string jobData)
    {
        try
        {
            Console.WriteLine($"Processing job: {jobData}");
            // Implement your job processing logic here
            Console.WriteLine($"Job processed successfully: {jobData}. Completed on {DateTime.Now}");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error processing job: {ex.Message}");
            // Handle exception (e.g., retry logic)
        }
    }

}


Distributed Jobs Class Lib

DistributedJob.cs

public class DistributedJob : IDistributedJob
{
    public required string Id { get; set; }
    public required string Name { get; set; }
    public required string Description { get; set; }
    public required string TypeName { get; set; }

    public required string Status { get; set; }
    public required string CronSchedule { get; set; }
    public DateTime? CreatedAt { get; set; }
}

public interface IDistributedJob
{
    string Id { get; set; }
    string Name { get; set; }
    string Description { get; set; }
    string TypeName { get; set; }
    string Status { get; set; }
    string CronSchedule { get; set; }
    DateTime? CreatedAt { get; set; }
}


Run Rabbit MQ with docker. It is easier than installing in local machine

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

Postman Collection

{
"info": {
"_postman_id": "108dded1-e54e-4999-974a-4bb8353b8da1",
"name": "Distributed Scheduler",
"schema": "https://schema.getpostman.com/json/collection/v2.0.0/collection.json",
"_exporter_id": "15322780"
},
"item": [
{
"name": "Manager",
"item": [
{
"name": "Job",
"item": [
{
"name": "New Request",
"request": {
"method": "POST",
"header": [],
"body": {
"mode": "raw",
"raw": "{\r\n    \"Id\": \"acfd5c51-fc47-4f26-8fb1-0c5253ef9e65\",\r\n    \"Name\": \"Job 1\",\r\n    \"Description\": \"Job One\",\r\n    \"status\": \"pending\",\r\n    \"TypeName\": \"DistributedJob, DistributedScheduler.Jobs\",\r\n    \"CronSchedule\": \"* * * * \"\r\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"url": "http://localhost:5082/api/manager/job"
},
"response": []
}
]
},
{
"name": "Health",
"request": {
"method": "GET",
"header": [],
"url": "http://localhost:5082/api/manager/health"
},
"response": []
},
{
"name": "Status",
"request": {
"method": "GET",
"header": [],
"url": "http://localhost:5082/api/manager/health"
},
"response": []
}
]
},
{
"name": "Runner",
"item": [
{
"name": "Health",
"request": {
"method": "GET",
"header": [],
"url": "http://localhost:5028/api/runner/health"
},
"response": []
},
{
"name": "Status",
"request": {
"method": "GET",
"header": [],
"url": "http://localhost:5028/api/runner/health"
},
"response": []
}
]
}
]
}

No comments: