EventServiceGrpc stop function

Hi team,

I’m making good progress implementing a FastAPI app with a LiDAR service for the SICK LMS4000 but I am wondering if there’s a a defined/supported way to implement a stop() function for the service.

I’ve essentially wrapped this example from SICK in a EventServiceGrpc class which autoruns with the ssytem, but it might be neater to have it start and stop with the app. There is some clean shutdown code in the linked python example which I would like to include in the service, so that it is run when the app stops.

Hi @gsainsbury86 ,

That is excellent progress! Here are a few things that should help you keep pushing:

journalctl / systemctl calls

You can use journalctl calls to monitor service logs and status.

You can use systemctl calls to manually start, stop, restart, etc. your custom services from a terminal.

Replace farm-ng-user-<user_name> with the user the service belongs to and custom-service with the service’s name when you run any of the below commands.

You can also wrap the systemctl commands and make these calls from within your code with e.g., the subprocess — Subprocess management — Python 3.12.2 documentation module (though be aware they can be quite blocking).

User the custom service belongs to

There is no need to run the commands as a different user if it is your user and the service belongs to you.

Monitor

journalctl -f --user-unit custom-service.service

start, stop, restart:

systemctl --user stop custom-service.service

Other users

Other users must run the commands as the user the service belongs to.

Monitor

sudo -u farm-ng-user-<user_name> journalctl -f --user-unit custom-service.service

start, stop, restart:

sudo -u farm-ng-user-<user_name> XDG_RUNTIME_DIR=/run/user/$(id -u farm-ng-user-<user_name>) systemctl --user stop custom-service.service

Defined in your manifest.json

You should be able to define what you are trying to do in your manifest.json file. You can try setting autostart: false for your service and provide it as a dependency for your app’s service.

I’m glad to hear about how you’re progressing!
Kyle

FYI - we’ve updated the documentation to make the journalctl & systemctl instructions clearer. Please see:

Thanks Kyle!

That part all made sense and I’d actually had success troubleshooting and running the app and service on the device. My specific question was what the entry point for the code was when I tell the systemctl to stop it. i.e. does the event service class have a stop() function that gets called or should I be trying to catch a sighup or something else to run shut down code? When I start it, for example, it calls run() and serve() right?

The issue is that the lidar itself gets into a funny state if I don’t properly run the driver close proce

OK I understand the question now. Short answer: yes, you should be trying to catch a signal and run some shutdown code that cleanly closes / shuts down the device.

You can check out the (current latest release’s) implementation of the EventServiceGrpc class and see that there is no explicit stop() or close() method. We haven’t seen any issues with an unclean disconnection of the gRPC server.

You should already be explicitly calling the .serve() method to start the service in your code and be running something like:

async def main(event_service, your_device_driver):
    async_tasks: list[asyncio.Task] = []
    async_tasks.append(asyncio.create_task(your_device_driver.your_run_function()))
    async_tasks.append(asyncio.create_task(event_service.serve()))
    await asyncio.gather(*async_tasks)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    # create event_service & your_device_driver

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(event_service, your_device_driver))

Possible solution

So I think what you are trying to do is distinct from the event_service and is about your sensor. Hopefully the driver you are using already has some method for closing the device cleanly, and you just need to figure out how to detect when that should occur and plumb that in.

If you want to catch the signal when your service is being stopped, you can try something like:

WARNING: This is untested, so it might require a bit of debugging, but should get you on the right path

async def shutdown(loop, your_device_driver):
    print("Shutdown initiated...")
    await your_device_driver.close_device()
    print("Shutdown complete.")
    loop.stop()

if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    # create event_service & your_device_driver

    # Setup shutdown signal handlers
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop, your_device_driver)))

    try:
        loop.run_until_complete(main(event_service, your_device_driver))
    finally:
        loop.close()

I hope this helps!

– Kyle

Oh perfect, that’s exactly what I was looking for. Thank you!

I’ve got plenty of experience with Python and FastAPI but very little with grpc and not a heap with asyncio, so some parts are slow going. I’ll implement this and it’ll hopefully have some reliability improvements.

On that note, perhaps another quick implementation question. The SICK drivers/API work by creating a connection to the device and registering a callback which receives messages from the scanner. My first implementations were to have the callback then process and publish those results with the event service. While I was developing, I had the process sleep for a second between each message but when I remove the sleep and try to process all the messages, the brain can’t keep up, so I was looking to just publish the unprocessed messages and buffer them on the receiving side and process them once the scan finishes. The difficulty there is that the message are a ctypes.Structure with a few nested objects and pointers, so converting them to something that works with protocolbuf was difficult and/or clunky.

My current app is built as a lidar-service which is the EventServiceGrpc implementation which publishes all the data and lidar-app which is the FastAPI frontend. I’ve been thinking about how to solve the problem and maybe a better solution would be for the lidar-service to be buffering/saving the data rather than publishing the full stream. Do you have any thoughts?

Is there anyway to convert the message / Structure to/from raw bytes? If so you could create or use a proto message with a generic bytes field to stream the messages with (we have a lot of examples of these in farm-ng-core/protos/farm_ng/core/event_service.proto at main · farm-ng/farm-ng-core · GitHub).

Otherwise, we would most likely look at the required fields in the constructor of your Structure and create some to_proto / from_proto methods that copy the required values from the Structure object required to create a separate, but identical instance of that object. You can create a small library of to_proto / from_proto methods for the nested objects.

That is definitely an option, but being able to stream to another computer for live processing seems like it would be much preferred.

Thanks for the suggestion, Kyle. I was able to implement a to_proto() and from_proto() method and create a full .proto representation of the LiDAR’s message object. This seems to have worked well at this stage.

Great to hear! I’ll mark that as the solution :grinning: